diff options
-rw-r--r-- | include/grpc/grpc.h | 14 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 8 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 46 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 147 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 16 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 35 | ||||
-rw-r--r-- | src/core/transport/transport.c | 9 | ||||
-rw-r--r-- | src/core/transport/transport.h | 58 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 19 |
9 files changed, 154 insertions, 198 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 8b4676562b..bb6753a9bb 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -118,6 +118,20 @@ typedef struct { #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ "grpc.http2.initial_sequence_number" +/** Connectivity state of a channel. */ +typedef enum { + /** channel is connecting */ + GRPC_CHANNEL_CONNECTING, + /** channel is ready for work */ + GRPC_CHANNEL_READY, + /** channel has seen a failure but expects to recover */ + GRPC_CHANNEL_TRANSIENT_FAILURE, + /** channel is idle */ + GRPC_CHANNEL_IDLE, + /** channel has seen a failure that it cannot recover from */ + GRPC_CHANNEL_FATAL_FAILURE +} grpc_connectivity_state; + /* Result of a grpc call. If the caller satisfies the prerequisites of a particular operation, the grpc_call_error returned will be GRPC_CALL_OK. Receiving any other value listed here is an indication of a bug in the diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index b575367b52..ff1077ce4c 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -186,12 +186,12 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_op(next_elem, op); + next_elem->filter->start_transport_stream_op(next_elem, op); } -void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { - grpc_channel_element *next_elem = elem + op->dir; - next_elem->filter->channel_op(next_elem, elem, op); +void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->start_transport_op(next_elem, op); } grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 873a8ac4d3..5ac2372f3a 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,45 +51,6 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; -/* The direction of the call. - The values of the enums (1, -1) matter here - they are used to increment - or decrement a pointer to find the next element to call */ -typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; - -typedef enum { - /* send a goaway message to remote channels indicating that we are going - to disconnect in the future */ - GRPC_CHANNEL_GOAWAY, - /* disconnect any underlying transports */ - GRPC_CHANNEL_DISCONNECT, - /* transport received a new call */ - GRPC_ACCEPT_CALL, - /* an underlying transport was closed */ - GRPC_TRANSPORT_CLOSED, - /* an underlying transport is about to be closed */ - GRPC_TRANSPORT_GOAWAY -} grpc_channel_op_type; - -/* A single filterable operation to be performed on a channel */ -typedef struct { - /* The type of operation we're performing */ - grpc_channel_op_type type; - /* The directionality of this call - is it bubbling up the stack, or down? */ - grpc_call_dir dir; - - /* Argument data, matching up with grpc_channel_op_type names */ - union { - struct { - grpc_transport *transport; - const void *transport_server_data; - } accept_call; - struct { - grpc_status_code status; - gpr_slice message; - } goaway; - } data; -} grpc_channel_op; - /* Channel filters specify: 1. the amount of memory needed in the channel & call (via the sizeof_XXX members) @@ -103,13 +64,12 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_op)(grpc_call_element *elem, + void (*start_transport_stream_op)(grpc_call_element *elem, grpc_transport_stream_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ - void (*channel_op)(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op); + void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op); /* sizeof(per call data) */ size_t sizeof_call_data; @@ -211,7 +171,7 @@ void grpc_call_stack_destroy(grpc_call_stack *stack); void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ -void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); +void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 4ae951074d..9630f6898d 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -54,26 +54,27 @@ typedef struct { grpc_mdctx *mdctx; /** resolver for this channel */ grpc_resolver *resolver; - /** channel arguments for this channel - TODO(ctiller): still needed? */ - grpc_channel_args *args; - /** mutex protecting waiting list */ - gpr_mu mu_waiting; /** mutex protecting client configuration, resolution state */ gpr_mu mu_config; - /** currently active load balancer - guarded by mu_config */ grpc_lb_policy *lb_policy; - /** incoming configuration - set by resolver.next guarded by mu_config */ grpc_client_config *incoming_configuration; + /** a list of closures that are all waiting for config to come in */ + grpc_iomgr_closure *waiting_for_config_closures; + /** resolver callback */ + grpc_iomgr_closure on_config_changed; + /** connectivity state being tracked */ + grpc_iomgr_closure *on_connectivity_state_change; + grpc_connectivity_state *connectivity_state; } channel_data; typedef enum { CALL_CREATED, - CALL_WAITING, + CALL_WAITING_FOR_CONFIG, + CALL_WAITING_FOR_PICK, CALL_ACTIVE, CALL_CANCELLED } call_state; @@ -193,13 +194,12 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { abort(); } -static void cc_start_transport_op(grpc_call_element *elem, +static void cc_start_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; - grpc_transport_stream_op waiting_op; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -207,10 +207,8 @@ static void cc_start_transport_op(grpc_call_element *elem, switch (calld->state) { case CALL_ACTIVE: subchannel_call = calld->s.active.subchannel_call; - grpc_subchannel_call_ref(subchannel_call); gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_process_op(subchannel_call, op); - grpc_subchannel_call_unref(subchannel_call); break; case CALL_CANCELLED: gpr_mu_unlock(&calld->mu_state); @@ -222,7 +220,6 @@ static void cc_start_transport_op(grpc_call_element *elem, gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); } else { - calld->state = CALL_WAITING; calld->s.waiting_op = *op; gpr_mu_lock(&chand->mu_config); @@ -230,45 +227,44 @@ static void cc_start_transport_op(grpc_call_element *elem, if (lb_policy) { grpc_lb_policy_ref(lb_policy); gpr_mu_unlock(&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; gpr_mu_unlock(&calld->mu_state); pick_target(lb_policy, calld); grpc_lb_policy_unref(lb_policy); } else { + calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(chand, calld); gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); } } break; - case CALL_WAITING: + case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_PICK: if (op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op = calld->s.waiting_op; calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, &waiting_op); handle_op_after_cancellation(elem, op); } else { GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL)); GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL)); - if (op->send_ops) { + if (op->send_ops != NULL) { calld->s.waiting_op.send_ops = op->send_ops; calld->s.waiting_op.is_last_send = op->is_last_send; calld->s.waiting_op.on_done_send = op->on_done_send; - calld->s.waiting_op.send_user_data = op->send_user_data; } - if (op->recv_ops) { + if (op->recv_ops != NULL) { calld->s.waiting_op.recv_ops = op->recv_ops; calld->s.waiting_op.recv_state = op->recv_state; calld->s.waiting_op.on_done_recv = op->on_done_recv; - calld->s.waiting_op.recv_user_data = op->recv_user_data; } gpr_mu_unlock(&calld->mu_state); - if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } break; @@ -372,6 +368,55 @@ static void cc_start_transport_op(grpc_call_element *elem, #endif } +static void update_state_locked(channel_data *chand) { + +} + +static void cc_on_config_changed(void *arg, int iomgr_success) { + channel_data *chand = arg; + grpc_lb_policy *lb_policy = NULL; + grpc_lb_policy *old_lb_policy; + grpc_resolver *old_resolver; + grpc_iomgr_closure *wakeup_closures = NULL; + + if (chand->incoming_configuration) { + lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + grpc_lb_policy_ref(lb_policy); + } + + grpc_client_config_unref(chand->incoming_configuration); + chand->incoming_configuration = NULL; + + gpr_mu_lock(&chand->mu_config); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + if (lb_policy != NULL) { + wakeup_closures = chand->waiting_for_config_closures; + chand->waiting_for_config_closures = NULL; + } + gpr_mu_unlock(&chand->mu_config); + + while (wakeup_closures) { + grpc_iomgr_closure *next = wakeup_closures->next; + grpc_iomgr_add_callback(wakeup_closures); + wakeup_closures = next; + } + + grpc_lb_policy_unref(old_lb_policy); + + if (iomgr_success) { + grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); + } else { + gpr_mu_lock(&chand->mu_config); + old_resolver = chand->resolver; + chand->resolver = NULL; + update_state_locked(chand); + gpr_mu_unlock(&chand->mu_config); + grpc_resolver_unref(old_resolver); + } +} + +#if 0 static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; @@ -451,6 +496,9 @@ static void channel_op(grpc_channel_element *elem, break; } } +#endif + +static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, @@ -471,26 +519,28 @@ static void init_call_elem(grpc_call_element *elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - gpr_mu_unlock(&chand->mu); - grpc_child_call_destroy(calld->s.active.child_call); + subchannel_call = calld->s.active.subchannel_call; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_unref(subchannel_call); break; - case CALL_WAITING: - remove_waiting_child(chand, calld); - gpr_mu_unlock(&chand->mu); + case CALL_CREATED: + case CALL_CANCELLED: + gpr_mu_unlock(&calld->mu_state); break; - default: - gpr_mu_unlock(&chand->mu); + case CALL_WAITING_FOR_PICK: + case CALL_WAITING_FOR_CONFIG: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); break; } - GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -504,41 +554,32 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - gpr_mu_init(&chand->mu); - chand->active_child = NULL; - chand->waiting_children = NULL; - chand->waiting_child_count = 0; - chand->waiting_child_capacity = 0; - chand->transport_setup = NULL; - chand->transport_setup_initiated = 0; - chand->args = grpc_channel_args_copy(args); + gpr_mu_init(&chand->mu_config); + chand->resolver = NULL; chand->mdctx = metadata_context; + grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); } /* Destructor for channel_data */ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_transport_setup_cancel(chand->transport_setup); - - if (chand->active_child) { - grpc_child_channel_destroy(chand->active_child, 1); - chand->active_child = NULL; + if (chand->resolver) { + grpc_resolver_unref(chand->resolver); } - - grpc_channel_args_destroy(chand->args); - - gpr_mu_destroy(&chand->mu); - GPR_ASSERT(chand->waiting_child_count == 0); - gpr_free(chand->waiting_children); + if (chand->lb_policy) { + grpc_lb_policy_unref(chand->lb_policy); + } + gpr_mu_destroy(&chand->mu_config); } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_op, channel_op, sizeof(call_data), + cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client-channel", }; +#if 0 grpc_transport_setup_result grpc_client_channel_transport_setup_complete( grpc_channel_stack *channel_stack, grpc_transport *transport, grpc_channel_filter const **channel_filters, size_t num_channel_filters, @@ -620,6 +661,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( return result; } +#endif void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver) { @@ -628,5 +670,6 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, channel_data *chand = elem->channel_data; GPR_ASSERT(!chand->resolver); chand->resolver = resolver; + grpc_resolver_ref(resolver); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 9128aaeda7..0c6c9b3e64 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -44,22 +44,6 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; -/** Connectivity state of a channel. - TODO(ctiller): move to grpc.h when we implement the public - version of the connectivity apis */ -typedef enum { - /** channel is connecting */ - GRPC_CHANNEL_CONNECTING, - /** channel is ready for work */ - GRPC_CHANNEL_READY, - /** channel has seen a failure but expects to recover */ - GRPC_CHANNEL_TRANSIENT_FAILURE, - /** channel is idle */ - GRPC_CHANNEL_IDLE, - /** channel has seen a failure that it cannot recover from */ - GRPC_CHANNEL_FATAL_FAILURE -} grpc_connectivity_state; - void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9943dbdbac..685098bcba 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -114,26 +114,9 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); -<<<<<<< HEAD -static void schedule_cb(transport *t, op_closure closure, int success); -static void maybe_finish_read(transport *t, stream *s); -static void maybe_join_window_updates(transport *t, stream *s); -static void finish_reads(transport *t); -static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); -static void perform_op_locked(transport *t, stream *s, - grpc_transport_stream_op *op); -static void add_metadata_batch(transport *t, stream *s); - -static void flowctl_trace(transport *t, const char *flow, gpr_int32 window, - gpr_uint32 id, gpr_int32 delta) { - gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window, - delta, window + delta); -} -======= /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global); ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -385,16 +368,9 @@ static void goaway(grpc_transport *gt, grpc_status_code status, } static int init_stream(grpc_transport *gt, grpc_stream *gs, -<<<<<<< HEAD - const void *server_data, - grpc_transport_stream_op *initial_op) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; -======= const void *server_data, grpc_transport_stream_op *initial_op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b memset(s, 0, sizeof(*s)); @@ -635,14 +611,9 @@ static void maybe_start_some_streams( } } -<<<<<<< HEAD -static void perform_op_locked(transport *t, stream *s, - grpc_transport_stream_op *op) { -======= static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -699,15 +670,9 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, } static void perform_op(grpc_transport *gt, grpc_stream *gs, -<<<<<<< HEAD - grpc_transport_stream_op *op) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; -======= grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b lock(t); perform_op_locked(&t->global, &s->global, op); diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a73c32da1a..40faa27211 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -58,9 +58,14 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, initial_op); } -void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, +void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op) { - transport->vtable->perform_op(transport, stream, op); + transport->vtable->perform_stream_op(transport, stream, op); +} + +void grpc_transport_perform_op(grpc_transport *transport, + grpc_transport_op *op) { + transport->vtable->perform_op(transport, op); } void grpc_transport_add_to_pollset(grpc_transport *transport, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index fbfed46626..7f6a37d048 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -43,7 +43,6 @@ /* forward declarations */ typedef struct grpc_transport grpc_transport; -typedef struct grpc_transport_callbacks grpc_transport_callbacks; /* grpc_stream doesn't actually exist. It's used as a typesafe opaque pointer for whatever data the transport wants to track @@ -62,7 +61,8 @@ typedef enum grpc_stream_state { GRPC_STREAM_CLOSED } grpc_stream_state; -/* Transport op: a set of operations to perform on a transport */ +/* Transport stream op: a set of operations to perform on a transport + against a single stream */ typedef struct grpc_transport_stream_op { grpc_iomgr_closure *on_consumed; @@ -83,30 +83,27 @@ typedef struct grpc_transport_stream_op { grpc_call_context_element *context; } grpc_transport_stream_op; -/* Callbacks made from the transport to the upper layers of grpc. */ -struct grpc_transport_callbacks { - /* Initialize a new stream on behalf of the transport. - Must result in a call to - grpc_transport_init_stream(transport, ..., request) in the same call - stack. - Must not result in any other calls to the transport. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - request - request parameters for this stream (owned by the caller) - server_data - opaque transport dependent argument that should be passed - to grpc_transport_init_stream - */ - void (*accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); - - void (*goaway)(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug); - - /* The transport has been closed */ - void (*closed)(void *user_data, grpc_transport *transport); -}; +/** Transport op: a set of operations to perform on a transport as a whole */ +typedef struct grpc_transport_op { + /** connectivity monitoring */ + grpc_iomgr_closure *on_connectivity_state_change; + grpc_connectivity_state *connectivity_state; + /** should the transport be disconnected */ + int disconnect; + /** should we send a goaway? */ + int send_goaway; + /** what should the goaway contain? */ + grpc_status_code goaway_status; + gpr_slice *goaway_message; + /** set the callback for accepting new streams; + this is a permanent callback, unlike the other one-shot closures */ + void (*set_accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); + void *set_accept_stream_user_data; + /** add this transport to a pollset */ + grpc_pollset *bind_pollset; + /** send a ping, call this back if not NULL */ + grpc_iomgr_closure *send_ping; +} grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this transport */ @@ -144,10 +141,6 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, grpc_status_code status, grpc_mdstr *message); -/* TODO(ctiller): remove this */ -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); - char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); /* Send a batch of operations on a transport @@ -159,9 +152,11 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. op - a grpc_transport_stream_op specifying the op to perform */ -void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, +void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op); +void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op); + /* Send a ping on a transport Calls cb with user data when a response is received. */ @@ -180,7 +175,6 @@ void grpc_transport_destroy(grpc_transport *transport); /* Return type for grpc_transport_setup_callback */ typedef struct grpc_transport_setup_result { void *user_data; - const grpc_transport_callbacks *callbacks; } grpc_transport_setup_result; /* Given a transport, return callbacks for that transport. Used to finalize diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 442b64ca59..b65b1d5607 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -46,26 +46,17 @@ typedef struct grpc_transport_vtable { const void *server_data, grpc_transport_stream_op *initial_op); - /* implementation of grpc_transport_send_batch */ - void (*perform_op)(grpc_transport *self, grpc_stream *stream, + /* implementation of grpc_transport_perform_stream_op */ + void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, grpc_transport_stream_op *op); - /* implementation of grpc_transport_add_to_pollset */ - void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); + /* implementation of grpc_transport_perform_op */ + void (*perform_op)(grpc_transport *self, grpc_stream *stream, + grpc_transport_stream_op *op); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); - /* implementation of grpc_transport_goaway */ - void (*goaway)(grpc_transport *self, grpc_status_code status, - gpr_slice debug_data); - - /* implementation of grpc_transport_close */ - void (*close)(grpc_transport *self); - - /* implementation of grpc_transport_ping */ - void (*ping)(grpc_transport *self, grpc_iomgr_closure *cb); - /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_transport *self); } grpc_transport_vtable; |