diff options
Diffstat (limited to 'src/core/transport/transport.h')
-rw-r--r-- | src/core/transport/transport.h | 259 |
1 files changed, 137 insertions, 122 deletions
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index a2c41c47af..0f068dcb38 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,81 +31,128 @@ * */ -#ifndef GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H -#define GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H +#ifndef GRPC_CORE_TRANSPORT_TRANSPORT_H +#define GRPC_CORE_TRANSPORT_TRANSPORT_H #include <stddef.h> #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_set.h" -#include "src/core/transport/stream_op.h" +#include "src/core/transport/metadata_batch.h" +#include "src/core/transport/byte_stream.h" #include "src/core/channel/context.h" /* forward declarations */ typedef struct grpc_transport grpc_transport; -typedef struct grpc_transport_callbacks grpc_transport_callbacks; /* grpc_stream doesn't actually exist. It's used as a typesafe opaque pointer for whatever data the transport wants to track for a stream. */ typedef struct grpc_stream grpc_stream; -/* Represents the send/recv closed state of a stream. */ -typedef enum grpc_stream_state { - /* the stream is open for sends and receives */ - GRPC_STREAM_OPEN, - /* the stream is closed for sends, but may still receive data */ - GRPC_STREAM_SEND_CLOSED, - /* the stream is closed for receives, but may still send data */ - GRPC_STREAM_RECV_CLOSED, - /* the stream is closed for both sends and receives */ - GRPC_STREAM_CLOSED -} grpc_stream_state; - -/* Transport op: a set of operations to perform on a transport */ -typedef struct grpc_transport_op { - grpc_iomgr_closure *on_consumed; - - grpc_stream_op_buffer *send_ops; - int is_last_send; - grpc_iomgr_closure *on_done_send; - - grpc_stream_op_buffer *recv_ops; - grpc_stream_state *recv_state; - grpc_iomgr_closure *on_done_recv; - - grpc_pollset *bind_pollset; - +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ + +typedef struct grpc_stream_refcount { + gpr_refcount refs; + grpc_closure destroy; +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + const char *object_type; +#endif +} grpc_stream_refcount; + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type); +void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, + const char *reason); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) +#else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg); +void grpc_stream_ref(grpc_stream_refcount *refcount); +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg) +#endif + +/* Transport stream op: a set of operations to perform on a transport + against a single stream */ +typedef struct grpc_transport_stream_op { + /** Send initial metadata to the peer, from the provided metadata batch. */ + grpc_metadata_batch *send_initial_metadata; + + /** Send trailing metadata to the peer, from the provided metadata batch. */ + grpc_metadata_batch *send_trailing_metadata; + + /** Send message data to the peer, from the provided byte stream. */ + grpc_byte_stream *send_message; + + /** Receive initial metadata from the stream, into provided metadata batch. */ + grpc_metadata_batch *recv_initial_metadata; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure *recv_initial_metadata_ready; + + /** Receive message data from the stream, into provided byte stream. */ + grpc_byte_stream **recv_message; + /** Should be enqueued when one message is ready to be processed. */ + grpc_closure *recv_message_ready; + + /** Receive trailing metadata from the stream, into provided metadata batch. + */ + grpc_metadata_batch *recv_trailing_metadata; + + /** Should be enqueued when all requested operations (excluding recv_message + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ + grpc_closure *on_complete; + + /** If != GRPC_STATUS_OK, cancel this stream */ grpc_status_code cancel_with_status; + /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this + stream for both reading and writing */ + grpc_status_code close_with_status; + gpr_slice *optional_close_message; + /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; -} grpc_transport_op; +} grpc_transport_stream_op; -/* Callbacks made from the transport to the upper layers of grpc. */ -struct grpc_transport_callbacks { - /* Initialize a new stream on behalf of the transport. - Must result in a call to - grpc_transport_init_stream(transport, ..., request) in the same call - stack. - Must not result in any other calls to the transport. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - request - request parameters for this stream (owned by the caller) - server_data - opaque transport dependent argument that should be passed - to grpc_transport_init_stream - */ - void (*accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); - - void (*goaway)(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug); - - /* The transport has been closed */ - void (*closed)(void *user_data, grpc_transport *transport); -}; +/** Transport op: a set of operations to perform on a transport as a whole */ +typedef struct grpc_transport_op { + /** Called when processing of this op is done. */ + grpc_closure *on_consumed; + /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ + grpc_closure *on_connectivity_state_change; + grpc_connectivity_state *connectivity_state; + /** should the transport be disconnected */ + int disconnect; + /** should we send a goaway? + after a goaway is sent, once there are no more active calls on + the transport, the transport should disconnect */ + int send_goaway; + /** what should the goaway contain? */ + grpc_status_code goaway_status; + gpr_slice *goaway_message; + /** set the callback for accepting new streams; + this is a permanent callback, unlike the other one-shot closures. + If true, the callback is set to set_accept_stream_fn, with its + user_data argument set to set_accept_stream_user_data */ + bool set_accept_stream; + void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, + const void *server_data); + void *set_accept_stream_user_data; + /** add this transport to a pollset */ + grpc_pollset *bind_pollset; + /** add this transport to a pollset_set */ + grpc_pollset_set *bind_pollset_set; + /** send a ping, call this back if not NULL */ + grpc_closure *send_ping; +} grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this transport */ @@ -120,9 +167,14 @@ size_t grpc_transport_stream_size(grpc_transport *transport); stream - a pointer to uninitialized memory to initialize server_data - either NULL for a client initiated stream, or a pointer supplied from the accept_stream callback function */ -int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data, - grpc_transport_op *initial_op); +int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_stream *stream, + grpc_stream_refcount *refcount, + const void *server_data); + +void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_stream *stream, + grpc_pollset *pollset); /* Destroy transport data for a stream. @@ -134,20 +186,21 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, transport - the transport on which to create this stream stream - the grpc_stream to destroy (memory is still owned by the caller, but any child memory must be cleaned up) */ -void grpc_transport_destroy_stream(grpc_transport *transport, +void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_stream *stream); -void grpc_transport_op_finish_with_failure(grpc_transport_op *op); +void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op); -void grpc_transport_op_add_cancellation(grpc_transport_op *op, - grpc_status_code status, - grpc_mdstr *message); +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status); -/* TODO(ctiller): remove this */ -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); +void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, + grpc_status_code status, + gpr_slice *optional_message); -char *grpc_transport_op_string(grpc_transport_op *op); +char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); /* Send a batch of operations on a transport @@ -157,14 +210,20 @@ char *grpc_transport_op_string(grpc_transport_op *op); transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - op - a grpc_transport_op specifying the op to perform */ -void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + op - a grpc_transport_stream_op specifying the op to perform */ +void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, + grpc_stream *stream, + grpc_transport_stream_op *op); + +void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_transport_op *op); /* Send a ping on a transport Calls cb with user data when a response is received. */ -void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb); +void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb); /* Advise peer of pending connection termination. */ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, @@ -174,54 +233,10 @@ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ -void grpc_transport_destroy(grpc_transport *transport); - -/* Return type for grpc_transport_setup_callback */ -typedef struct grpc_transport_setup_result { - void *user_data; - const grpc_transport_callbacks *callbacks; -} grpc_transport_setup_result; - -/* Given a transport, return callbacks for that transport. Used to finalize - setup as a transport is being created */ -typedef grpc_transport_setup_result (*grpc_transport_setup_callback)( - void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx); - -typedef struct grpc_transport_setup grpc_transport_setup; -typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable; - -struct grpc_transport_setup_vtable { - void (*initiate)(grpc_transport_setup *setup); - void (*add_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*del_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*cancel)(grpc_transport_setup *setup); -}; - -/* Transport setup is an asynchronous utility interface for client channels to - establish connections. It's transport agnostic. */ -struct grpc_transport_setup { - const grpc_transport_setup_vtable *vtable; -}; - -/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name - given at transport construction time, create the tcp connection, perform - handshakes, and call some grpc_transport_setup_result function provided at - setup construction time. - This *may* be implemented as a no-op if the setup process monitors something - continuously. */ -void grpc_transport_setup_initiate(grpc_transport_setup *setup); - -void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); -void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); - -/* Cancel transport setup. After this returns, no new transports should be - created, and all pending transport setup callbacks should be completed. - After this call completes, setup should be considered invalid (this can be - used as a destruction call by setup). */ -void grpc_transport_setup_cancel(grpc_transport_setup *setup); - -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ +void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); + +/* Get the transports peer */ +char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, + grpc_transport *transport); + +#endif /* GRPC_CORE_TRANSPORT_TRANSPORT_H */ |