aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/transport.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/transport.h')
-rw-r--r--src/core/transport/transport.h259
1 files changed, 137 insertions, 122 deletions
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index a2c41c47af..0f068dcb38 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,81 +31,128 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H
-#define GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H
+#ifndef GRPC_CORE_TRANSPORT_TRANSPORT_H
+#define GRPC_CORE_TRANSPORT_TRANSPORT_H
#include <stddef.h>
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h"
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/metadata_batch.h"
+#include "src/core/transport/byte_stream.h"
#include "src/core/channel/context.h"
/* forward declarations */
typedef struct grpc_transport grpc_transport;
-typedef struct grpc_transport_callbacks grpc_transport_callbacks;
/* grpc_stream doesn't actually exist. It's used as a typesafe
opaque pointer for whatever data the transport wants to track
for a stream. */
typedef struct grpc_stream grpc_stream;
-/* Represents the send/recv closed state of a stream. */
-typedef enum grpc_stream_state {
- /* the stream is open for sends and receives */
- GRPC_STREAM_OPEN,
- /* the stream is closed for sends, but may still receive data */
- GRPC_STREAM_SEND_CLOSED,
- /* the stream is closed for receives, but may still send data */
- GRPC_STREAM_RECV_CLOSED,
- /* the stream is closed for both sends and receives */
- GRPC_STREAM_CLOSED
-} grpc_stream_state;
-
-/* Transport op: a set of operations to perform on a transport */
-typedef struct grpc_transport_op {
- grpc_iomgr_closure *on_consumed;
-
- grpc_stream_op_buffer *send_ops;
- int is_last_send;
- grpc_iomgr_closure *on_done_send;
-
- grpc_stream_op_buffer *recv_ops;
- grpc_stream_state *recv_state;
- grpc_iomgr_closure *on_done_recv;
-
- grpc_pollset *bind_pollset;
-
+/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
+
+typedef struct grpc_stream_refcount {
+ gpr_refcount refs;
+ grpc_closure destroy;
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ const char *object_type;
+#endif
+} grpc_stream_refcount;
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ const char *object_type);
+void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason);
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
+ const char *reason);
+#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
+ grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
+#else
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg);
+void grpc_stream_ref(grpc_stream_refcount *refcount);
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
+#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
+ grpc_stream_ref_init(rc, ir, cb, cb_arg)
+#endif
+
+/* Transport stream op: a set of operations to perform on a transport
+ against a single stream */
+typedef struct grpc_transport_stream_op {
+ /** Send initial metadata to the peer, from the provided metadata batch. */
+ grpc_metadata_batch *send_initial_metadata;
+
+ /** Send trailing metadata to the peer, from the provided metadata batch. */
+ grpc_metadata_batch *send_trailing_metadata;
+
+ /** Send message data to the peer, from the provided byte stream. */
+ grpc_byte_stream *send_message;
+
+ /** Receive initial metadata from the stream, into provided metadata batch. */
+ grpc_metadata_batch *recv_initial_metadata;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure *recv_initial_metadata_ready;
+
+ /** Receive message data from the stream, into provided byte stream. */
+ grpc_byte_stream **recv_message;
+ /** Should be enqueued when one message is ready to be processed. */
+ grpc_closure *recv_message_ready;
+
+ /** Receive trailing metadata from the stream, into provided metadata batch.
+ */
+ grpc_metadata_batch *recv_trailing_metadata;
+
+ /** Should be enqueued when all requested operations (excluding recv_message
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
+ grpc_closure *on_complete;
+
+ /** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
+ /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this
+ stream for both reading and writing */
+ grpc_status_code close_with_status;
+ gpr_slice *optional_close_message;
+
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
-} grpc_transport_op;
+} grpc_transport_stream_op;
-/* Callbacks made from the transport to the upper layers of grpc. */
-struct grpc_transport_callbacks {
- /* Initialize a new stream on behalf of the transport.
- Must result in a call to
- grpc_transport_init_stream(transport, ..., request) in the same call
- stack.
- Must not result in any other calls to the transport.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- request - request parameters for this stream (owned by the caller)
- server_data - opaque transport dependent argument that should be passed
- to grpc_transport_init_stream
- */
- void (*accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
-
- void (*goaway)(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug);
-
- /* The transport has been closed */
- void (*closed)(void *user_data, grpc_transport *transport);
-};
+/** Transport op: a set of operations to perform on a transport as a whole */
+typedef struct grpc_transport_op {
+ /** Called when processing of this op is done. */
+ grpc_closure *on_consumed;
+ /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
+ grpc_closure *on_connectivity_state_change;
+ grpc_connectivity_state *connectivity_state;
+ /** should the transport be disconnected */
+ int disconnect;
+ /** should we send a goaway?
+ after a goaway is sent, once there are no more active calls on
+ the transport, the transport should disconnect */
+ int send_goaway;
+ /** what should the goaway contain? */
+ grpc_status_code goaway_status;
+ gpr_slice *goaway_message;
+ /** set the callback for accepting new streams;
+ this is a permanent callback, unlike the other one-shot closures.
+ If true, the callback is set to set_accept_stream_fn, with its
+ user_data argument set to set_accept_stream_user_data */
+ bool set_accept_stream;
+ void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_transport *transport,
+ const void *server_data);
+ void *set_accept_stream_user_data;
+ /** add this transport to a pollset */
+ grpc_pollset *bind_pollset;
+ /** add this transport to a pollset_set */
+ grpc_pollset_set *bind_pollset_set;
+ /** send a ping, call this back if not NULL */
+ grpc_closure *send_ping;
+} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
transport */
@@ -120,9 +167,14 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
stream - a pointer to uninitialized memory to initialize
server_data - either NULL for a client initiated stream, or a pointer
supplied from the accept_stream callback function */
-int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data,
- grpc_transport_op *initial_op);
+int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport, grpc_stream *stream,
+ grpc_stream_refcount *refcount,
+ const void *server_data);
+
+void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport, grpc_stream *stream,
+ grpc_pollset *pollset);
/* Destroy transport data for a stream.
@@ -134,20 +186,21 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
transport - the transport on which to create this stream
stream - the grpc_stream to destroy (memory is still owned by the
caller, but any child memory must be cleaned up) */
-void grpc_transport_destroy_stream(grpc_transport *transport,
+void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport,
grpc_stream *stream);
-void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
+ grpc_transport_stream_op *op);
-void grpc_transport_op_add_cancellation(grpc_transport_op *op,
- grpc_status_code status,
- grpc_mdstr *message);
+void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
+ grpc_status_code status);
-/* TODO(ctiller): remove this */
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
+void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ gpr_slice *optional_message);
-char *grpc_transport_op_string(grpc_transport_op *op);
+char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
/* Send a batch of operations on a transport
@@ -157,14 +210,20 @@ char *grpc_transport_op_string(grpc_transport_op *op);
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
- op - a grpc_transport_op specifying the op to perform */
-void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ op - a grpc_transport_stream_op specifying the op to perform */
+void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport,
+ grpc_stream *stream,
+ grpc_transport_stream_op *op);
+
+void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport,
grpc_transport_op *op);
/* Send a ping on a transport
Calls cb with user data when a response is received. */
-void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb);
+void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
@@ -174,54 +233,10 @@ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
-void grpc_transport_destroy(grpc_transport *transport);
-
-/* Return type for grpc_transport_setup_callback */
-typedef struct grpc_transport_setup_result {
- void *user_data;
- const grpc_transport_callbacks *callbacks;
-} grpc_transport_setup_result;
-
-/* Given a transport, return callbacks for that transport. Used to finalize
- setup as a transport is being created */
-typedef grpc_transport_setup_result (*grpc_transport_setup_callback)(
- void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx);
-
-typedef struct grpc_transport_setup grpc_transport_setup;
-typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable;
-
-struct grpc_transport_setup_vtable {
- void (*initiate)(grpc_transport_setup *setup);
- void (*add_interested_party)(grpc_transport_setup *setup,
- grpc_pollset *pollset);
- void (*del_interested_party)(grpc_transport_setup *setup,
- grpc_pollset *pollset);
- void (*cancel)(grpc_transport_setup *setup);
-};
-
-/* Transport setup is an asynchronous utility interface for client channels to
- establish connections. It's transport agnostic. */
-struct grpc_transport_setup {
- const grpc_transport_setup_vtable *vtable;
-};
-
-/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name
- given at transport construction time, create the tcp connection, perform
- handshakes, and call some grpc_transport_setup_result function provided at
- setup construction time.
- This *may* be implemented as a no-op if the setup process monitors something
- continuously. */
-void grpc_transport_setup_initiate(grpc_transport_setup *setup);
-
-void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset);
-void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset);
-
-/* Cancel transport setup. After this returns, no new transports should be
- created, and all pending transport setup callbacks should be completed.
- After this call completes, setup should be considered invalid (this can be
- used as a destruction call by setup). */
-void grpc_transport_setup_cancel(grpc_transport_setup *setup);
-
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
+void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
+
+/* Get the transports peer */
+char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport);
+
+#endif /* GRPC_CORE_TRANSPORT_TRANSPORT_H */