From b7959a0b362daa951a245632ffa4617df4184a87 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 25 Jun 2015 08:50:54 -0700 Subject: s/grpc_transport_op/grpc_transport_stream_op/g --- src/core/transport/transport.h | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f60fdc037..72bc492d80 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -63,7 +63,7 @@ typedef enum grpc_stream_state { } grpc_stream_state; /* Transport op: a set of operations to perform on a transport */ -typedef struct grpc_transport_op { +typedef struct grpc_transport_stream_op { void (*on_consumed)(void *user_data, int success); void *on_consumed_user_data; @@ -84,7 +84,7 @@ typedef struct grpc_transport_op { /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; -} grpc_transport_op; +} grpc_transport_stream_op; /* Callbacks made from the transport to the upper layers of grpc. */ struct grpc_transport_callbacks { @@ -126,7 +126,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, - grpc_transport_op *initial_op); + grpc_transport_stream_op *initial_op); /* Destroy transport data for a stream. @@ -141,17 +141,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -void grpc_transport_op_finish_with_failure(grpc_transport_op *op); +void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op); -void grpc_transport_op_add_cancellation(grpc_transport_op *op, - grpc_status_code status, - grpc_mdstr *message); +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status, + grpc_mdstr *message); /* TODO(ctiller): remove this */ void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); -char *grpc_transport_op_string(grpc_transport_op *op); +char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); /* Send a batch of operations on a transport @@ -161,9 +161,9 @@ char *grpc_transport_op_string(grpc_transport_op *op); transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - op - a grpc_transport_op specifying the op to perform */ + op - a grpc_transport_stream_op specifying the op to perform */ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_op *op); + grpc_transport_stream_op *op); /* Send a ping on a transport -- cgit v1.2.3 From 3f475422ecb8cd5c648ce86f126122ba6dee1c9c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 25 Jun 2015 10:43:05 -0700 Subject: chop chop chop --- include/grpc/grpc.h | 14 ++++ src/core/channel/channel_stack.c | 8 +- src/core/channel/channel_stack.h | 46 +---------- src/core/channel/client_channel.c | 147 ++++++++++++++++++++++------------ src/core/client_config/subchannel.h | 16 ---- src/core/transport/chttp2_transport.c | 35 -------- src/core/transport/transport.c | 9 ++- src/core/transport/transport.h | 58 ++++++-------- src/core/transport/transport_impl.h | 19 ++--- 9 files changed, 154 insertions(+), 198 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 8b4676562b..bb6753a9bb 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -118,6 +118,20 @@ typedef struct { #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ "grpc.http2.initial_sequence_number" +/** Connectivity state of a channel. */ +typedef enum { + /** channel is connecting */ + GRPC_CHANNEL_CONNECTING, + /** channel is ready for work */ + GRPC_CHANNEL_READY, + /** channel has seen a failure but expects to recover */ + GRPC_CHANNEL_TRANSIENT_FAILURE, + /** channel is idle */ + GRPC_CHANNEL_IDLE, + /** channel has seen a failure that it cannot recover from */ + GRPC_CHANNEL_FATAL_FAILURE +} grpc_connectivity_state; + /* Result of a grpc call. If the caller satisfies the prerequisites of a particular operation, the grpc_call_error returned will be GRPC_CALL_OK. Receiving any other value listed here is an indication of a bug in the diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index b575367b52..ff1077ce4c 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -186,12 +186,12 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_op(next_elem, op); + next_elem->filter->start_transport_stream_op(next_elem, op); } -void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { - grpc_channel_element *next_elem = elem + op->dir; - next_elem->filter->channel_op(next_elem, elem, op); +void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->start_transport_op(next_elem, op); } grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 873a8ac4d3..5ac2372f3a 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,45 +51,6 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; -/* The direction of the call. - The values of the enums (1, -1) matter here - they are used to increment - or decrement a pointer to find the next element to call */ -typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; - -typedef enum { - /* send a goaway message to remote channels indicating that we are going - to disconnect in the future */ - GRPC_CHANNEL_GOAWAY, - /* disconnect any underlying transports */ - GRPC_CHANNEL_DISCONNECT, - /* transport received a new call */ - GRPC_ACCEPT_CALL, - /* an underlying transport was closed */ - GRPC_TRANSPORT_CLOSED, - /* an underlying transport is about to be closed */ - GRPC_TRANSPORT_GOAWAY -} grpc_channel_op_type; - -/* A single filterable operation to be performed on a channel */ -typedef struct { - /* The type of operation we're performing */ - grpc_channel_op_type type; - /* The directionality of this call - is it bubbling up the stack, or down? */ - grpc_call_dir dir; - - /* Argument data, matching up with grpc_channel_op_type names */ - union { - struct { - grpc_transport *transport; - const void *transport_server_data; - } accept_call; - struct { - grpc_status_code status; - gpr_slice message; - } goaway; - } data; -} grpc_channel_op; - /* Channel filters specify: 1. the amount of memory needed in the channel & call (via the sizeof_XXX members) @@ -103,13 +64,12 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_op)(grpc_call_element *elem, + void (*start_transport_stream_op)(grpc_call_element *elem, grpc_transport_stream_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ - void (*channel_op)(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op); + void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op); /* sizeof(per call data) */ size_t sizeof_call_data; @@ -211,7 +171,7 @@ void grpc_call_stack_destroy(grpc_call_stack *stack); void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ -void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); +void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 4ae951074d..9630f6898d 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -54,26 +54,27 @@ typedef struct { grpc_mdctx *mdctx; /** resolver for this channel */ grpc_resolver *resolver; - /** channel arguments for this channel - TODO(ctiller): still needed? */ - grpc_channel_args *args; - /** mutex protecting waiting list */ - gpr_mu mu_waiting; /** mutex protecting client configuration, resolution state */ gpr_mu mu_config; - /** currently active load balancer - guarded by mu_config */ grpc_lb_policy *lb_policy; - /** incoming configuration - set by resolver.next guarded by mu_config */ grpc_client_config *incoming_configuration; + /** a list of closures that are all waiting for config to come in */ + grpc_iomgr_closure *waiting_for_config_closures; + /** resolver callback */ + grpc_iomgr_closure on_config_changed; + /** connectivity state being tracked */ + grpc_iomgr_closure *on_connectivity_state_change; + grpc_connectivity_state *connectivity_state; } channel_data; typedef enum { CALL_CREATED, - CALL_WAITING, + CALL_WAITING_FOR_CONFIG, + CALL_WAITING_FOR_PICK, CALL_ACTIVE, CALL_CANCELLED } call_state; @@ -193,13 +194,12 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { abort(); } -static void cc_start_transport_op(grpc_call_element *elem, +static void cc_start_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; - grpc_transport_stream_op waiting_op; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -207,10 +207,8 @@ static void cc_start_transport_op(grpc_call_element *elem, switch (calld->state) { case CALL_ACTIVE: subchannel_call = calld->s.active.subchannel_call; - grpc_subchannel_call_ref(subchannel_call); gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_process_op(subchannel_call, op); - grpc_subchannel_call_unref(subchannel_call); break; case CALL_CANCELLED: gpr_mu_unlock(&calld->mu_state); @@ -222,7 +220,6 @@ static void cc_start_transport_op(grpc_call_element *elem, gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); } else { - calld->state = CALL_WAITING; calld->s.waiting_op = *op; gpr_mu_lock(&chand->mu_config); @@ -230,45 +227,44 @@ static void cc_start_transport_op(grpc_call_element *elem, if (lb_policy) { grpc_lb_policy_ref(lb_policy); gpr_mu_unlock(&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; gpr_mu_unlock(&calld->mu_state); pick_target(lb_policy, calld); grpc_lb_policy_unref(lb_policy); } else { + calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(chand, calld); gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); } } break; - case CALL_WAITING: + case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_PICK: if (op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op = calld->s.waiting_op; calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, &waiting_op); handle_op_after_cancellation(elem, op); } else { GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL)); GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL)); - if (op->send_ops) { + if (op->send_ops != NULL) { calld->s.waiting_op.send_ops = op->send_ops; calld->s.waiting_op.is_last_send = op->is_last_send; calld->s.waiting_op.on_done_send = op->on_done_send; - calld->s.waiting_op.send_user_data = op->send_user_data; } - if (op->recv_ops) { + if (op->recv_ops != NULL) { calld->s.waiting_op.recv_ops = op->recv_ops; calld->s.waiting_op.recv_state = op->recv_state; calld->s.waiting_op.on_done_recv = op->on_done_recv; - calld->s.waiting_op.recv_user_data = op->recv_user_data; } gpr_mu_unlock(&calld->mu_state); - if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } break; @@ -372,6 +368,55 @@ static void cc_start_transport_op(grpc_call_element *elem, #endif } +static void update_state_locked(channel_data *chand) { + +} + +static void cc_on_config_changed(void *arg, int iomgr_success) { + channel_data *chand = arg; + grpc_lb_policy *lb_policy = NULL; + grpc_lb_policy *old_lb_policy; + grpc_resolver *old_resolver; + grpc_iomgr_closure *wakeup_closures = NULL; + + if (chand->incoming_configuration) { + lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + grpc_lb_policy_ref(lb_policy); + } + + grpc_client_config_unref(chand->incoming_configuration); + chand->incoming_configuration = NULL; + + gpr_mu_lock(&chand->mu_config); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + if (lb_policy != NULL) { + wakeup_closures = chand->waiting_for_config_closures; + chand->waiting_for_config_closures = NULL; + } + gpr_mu_unlock(&chand->mu_config); + + while (wakeup_closures) { + grpc_iomgr_closure *next = wakeup_closures->next; + grpc_iomgr_add_callback(wakeup_closures); + wakeup_closures = next; + } + + grpc_lb_policy_unref(old_lb_policy); + + if (iomgr_success) { + grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); + } else { + gpr_mu_lock(&chand->mu_config); + old_resolver = chand->resolver; + chand->resolver = NULL; + update_state_locked(chand); + gpr_mu_unlock(&chand->mu_config); + grpc_resolver_unref(old_resolver); + } +} + +#if 0 static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; @@ -451,6 +496,9 @@ static void channel_op(grpc_channel_element *elem, break; } } +#endif + +static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, @@ -471,26 +519,28 @@ static void init_call_elem(grpc_call_element *elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - gpr_mu_unlock(&chand->mu); - grpc_child_call_destroy(calld->s.active.child_call); + subchannel_call = calld->s.active.subchannel_call; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_unref(subchannel_call); break; - case CALL_WAITING: - remove_waiting_child(chand, calld); - gpr_mu_unlock(&chand->mu); + case CALL_CREATED: + case CALL_CANCELLED: + gpr_mu_unlock(&calld->mu_state); break; - default: - gpr_mu_unlock(&chand->mu); + case CALL_WAITING_FOR_PICK: + case CALL_WAITING_FOR_CONFIG: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); break; } - GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -504,41 +554,32 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - gpr_mu_init(&chand->mu); - chand->active_child = NULL; - chand->waiting_children = NULL; - chand->waiting_child_count = 0; - chand->waiting_child_capacity = 0; - chand->transport_setup = NULL; - chand->transport_setup_initiated = 0; - chand->args = grpc_channel_args_copy(args); + gpr_mu_init(&chand->mu_config); + chand->resolver = NULL; chand->mdctx = metadata_context; + grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); } /* Destructor for channel_data */ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_transport_setup_cancel(chand->transport_setup); - - if (chand->active_child) { - grpc_child_channel_destroy(chand->active_child, 1); - chand->active_child = NULL; + if (chand->resolver) { + grpc_resolver_unref(chand->resolver); } - - grpc_channel_args_destroy(chand->args); - - gpr_mu_destroy(&chand->mu); - GPR_ASSERT(chand->waiting_child_count == 0); - gpr_free(chand->waiting_children); + if (chand->lb_policy) { + grpc_lb_policy_unref(chand->lb_policy); + } + gpr_mu_destroy(&chand->mu_config); } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_op, channel_op, sizeof(call_data), + cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client-channel", }; +#if 0 grpc_transport_setup_result grpc_client_channel_transport_setup_complete( grpc_channel_stack *channel_stack, grpc_transport *transport, grpc_channel_filter const **channel_filters, size_t num_channel_filters, @@ -620,6 +661,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( return result; } +#endif void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver) { @@ -628,5 +670,6 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, channel_data *chand = elem->channel_data; GPR_ASSERT(!chand->resolver); chand->resolver = resolver; + grpc_resolver_ref(resolver); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 9128aaeda7..0c6c9b3e64 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -44,22 +44,6 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; -/** Connectivity state of a channel. - TODO(ctiller): move to grpc.h when we implement the public - version of the connectivity apis */ -typedef enum { - /** channel is connecting */ - GRPC_CHANNEL_CONNECTING, - /** channel is ready for work */ - GRPC_CHANNEL_READY, - /** channel has seen a failure but expects to recover */ - GRPC_CHANNEL_TRANSIENT_FAILURE, - /** channel is idle */ - GRPC_CHANNEL_IDLE, - /** channel has seen a failure that it cannot recover from */ - GRPC_CHANNEL_FATAL_FAILURE -} grpc_connectivity_state; - void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9943dbdbac..685098bcba 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -114,26 +114,9 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); -<<<<<<< HEAD -static void schedule_cb(transport *t, op_closure closure, int success); -static void maybe_finish_read(transport *t, stream *s); -static void maybe_join_window_updates(transport *t, stream *s); -static void finish_reads(transport *t); -static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); -static void perform_op_locked(transport *t, stream *s, - grpc_transport_stream_op *op); -static void add_metadata_batch(transport *t, stream *s); - -static void flowctl_trace(transport *t, const char *flow, gpr_int32 window, - gpr_uint32 id, gpr_int32 delta) { - gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window, - delta, window + delta); -} -======= /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global); ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -385,16 +368,9 @@ static void goaway(grpc_transport *gt, grpc_status_code status, } static int init_stream(grpc_transport *gt, grpc_stream *gs, -<<<<<<< HEAD - const void *server_data, - grpc_transport_stream_op *initial_op) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; -======= const void *server_data, grpc_transport_stream_op *initial_op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b memset(s, 0, sizeof(*s)); @@ -635,14 +611,9 @@ static void maybe_start_some_streams( } } -<<<<<<< HEAD -static void perform_op_locked(transport *t, stream *s, - grpc_transport_stream_op *op) { -======= static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -699,15 +670,9 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, } static void perform_op(grpc_transport *gt, grpc_stream *gs, -<<<<<<< HEAD - grpc_transport_stream_op *op) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; -======= grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; ->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b lock(t); perform_op_locked(&t->global, &s->global, op); diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a73c32da1a..40faa27211 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -58,9 +58,14 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, initial_op); } -void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, +void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op) { - transport->vtable->perform_op(transport, stream, op); + transport->vtable->perform_stream_op(transport, stream, op); +} + +void grpc_transport_perform_op(grpc_transport *transport, + grpc_transport_op *op) { + transport->vtable->perform_op(transport, op); } void grpc_transport_add_to_pollset(grpc_transport *transport, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index fbfed46626..7f6a37d048 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -43,7 +43,6 @@ /* forward declarations */ typedef struct grpc_transport grpc_transport; -typedef struct grpc_transport_callbacks grpc_transport_callbacks; /* grpc_stream doesn't actually exist. It's used as a typesafe opaque pointer for whatever data the transport wants to track @@ -62,7 +61,8 @@ typedef enum grpc_stream_state { GRPC_STREAM_CLOSED } grpc_stream_state; -/* Transport op: a set of operations to perform on a transport */ +/* Transport stream op: a set of operations to perform on a transport + against a single stream */ typedef struct grpc_transport_stream_op { grpc_iomgr_closure *on_consumed; @@ -83,30 +83,27 @@ typedef struct grpc_transport_stream_op { grpc_call_context_element *context; } grpc_transport_stream_op; -/* Callbacks made from the transport to the upper layers of grpc. */ -struct grpc_transport_callbacks { - /* Initialize a new stream on behalf of the transport. - Must result in a call to - grpc_transport_init_stream(transport, ..., request) in the same call - stack. - Must not result in any other calls to the transport. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - request - request parameters for this stream (owned by the caller) - server_data - opaque transport dependent argument that should be passed - to grpc_transport_init_stream - */ - void (*accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); - - void (*goaway)(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug); - - /* The transport has been closed */ - void (*closed)(void *user_data, grpc_transport *transport); -}; +/** Transport op: a set of operations to perform on a transport as a whole */ +typedef struct grpc_transport_op { + /** connectivity monitoring */ + grpc_iomgr_closure *on_connectivity_state_change; + grpc_connectivity_state *connectivity_state; + /** should the transport be disconnected */ + int disconnect; + /** should we send a goaway? */ + int send_goaway; + /** what should the goaway contain? */ + grpc_status_code goaway_status; + gpr_slice *goaway_message; + /** set the callback for accepting new streams; + this is a permanent callback, unlike the other one-shot closures */ + void (*set_accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); + void *set_accept_stream_user_data; + /** add this transport to a pollset */ + grpc_pollset *bind_pollset; + /** send a ping, call this back if not NULL */ + grpc_iomgr_closure *send_ping; +} grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this transport */ @@ -144,10 +141,6 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, grpc_status_code status, grpc_mdstr *message); -/* TODO(ctiller): remove this */ -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); - char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); /* Send a batch of operations on a transport @@ -159,9 +152,11 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. op - a grpc_transport_stream_op specifying the op to perform */ -void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, +void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op); +void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op); + /* Send a ping on a transport Calls cb with user data when a response is received. */ @@ -180,7 +175,6 @@ void grpc_transport_destroy(grpc_transport *transport); /* Return type for grpc_transport_setup_callback */ typedef struct grpc_transport_setup_result { void *user_data; - const grpc_transport_callbacks *callbacks; } grpc_transport_setup_result; /* Given a transport, return callbacks for that transport. Used to finalize diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 442b64ca59..b65b1d5607 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -46,26 +46,17 @@ typedef struct grpc_transport_vtable { const void *server_data, grpc_transport_stream_op *initial_op); - /* implementation of grpc_transport_send_batch */ - void (*perform_op)(grpc_transport *self, grpc_stream *stream, + /* implementation of grpc_transport_perform_stream_op */ + void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, grpc_transport_stream_op *op); - /* implementation of grpc_transport_add_to_pollset */ - void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); + /* implementation of grpc_transport_perform_op */ + void (*perform_op)(grpc_transport *self, grpc_stream *stream, + grpc_transport_stream_op *op); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); - /* implementation of grpc_transport_goaway */ - void (*goaway)(grpc_transport *self, grpc_status_code status, - gpr_slice debug_data); - - /* implementation of grpc_transport_close */ - void (*close)(grpc_transport *self); - - /* implementation of grpc_transport_ping */ - void (*ping)(grpc_transport *self, grpc_iomgr_closure *cb); - /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_transport *self); } grpc_transport_vtable; -- cgit v1.2.3 From e039f0338333e1a2f368ec20740662fb2eac2875 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 25 Jun 2015 12:54:23 -0700 Subject: Plumbing transport_op changes through --- BUILD | 4 - Makefile | 2 - build.json | 2 - gRPC.podspec | 3 - src/core/channel/connected_channel.c | 94 ++++--------------- src/core/channel/http_client_filter.c | 23 +---- src/core/channel/http_server_filter.c | 26 +----- src/core/channel/noop_filter.c | 34 +++---- src/core/surface/call.c | 2 +- src/core/surface/channel.c | 24 ++--- src/core/surface/channel_create.c | 6 +- src/core/surface/client.c | 89 ------------------ src/core/surface/client.h | 41 --------- src/core/surface/lame_client.c | 43 +++++---- src/core/surface/server.c | 100 +++++++++++++++------ src/core/transport/chttp2_transport.c | 4 - src/core/transport/transport.c | 18 ---- src/core/transport/transport.h | 2 + src/core/transport/transport_impl.h | 2 +- test/core/end2end/fixtures/chttp2_fullstack.c | 1 - tools/doxygen/Doxyfile.core.internal | 2 - vsprojects/grpc/grpc.vcxproj | 3 - vsprojects/grpc/grpc.vcxproj.filters | 6 -- vsprojects/grpc_unsecure/grpc_unsecure.vcxproj | 3 - .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 -- 25 files changed, 146 insertions(+), 394 deletions(-) delete mode 100644 src/core/surface/client.c delete mode 100644 src/core/surface/client.h (limited to 'src/core/transport/transport.h') diff --git a/BUILD b/BUILD index 745b62933f..29cc77374e 100644 --- a/BUILD +++ b/BUILD @@ -208,7 +208,6 @@ cc_library( "src/core/surface/byte_buffer_queue.h", "src/core/surface/call.h", "src/core/surface/channel.h", - "src/core/surface/client.h", "src/core/surface/completion_queue.h", "src/core/surface/event_string.h", "src/core/surface/init.h", @@ -333,7 +332,6 @@ cc_library( "src/core/surface/call_log_batch.c", "src/core/surface/channel.c", "src/core/surface/channel_create.c", - "src/core/surface/client.c", "src/core/surface/completion_queue.c", "src/core/surface/event_string.c", "src/core/surface/init.c", @@ -456,7 +454,6 @@ cc_library( "src/core/surface/byte_buffer_queue.h", "src/core/surface/call.h", "src/core/surface/channel.h", - "src/core/surface/client.h", "src/core/surface/completion_queue.h", "src/core/surface/event_string.h", "src/core/surface/init.h", @@ -559,7 +556,6 @@ cc_library( "src/core/surface/call_log_batch.c", "src/core/surface/channel.c", "src/core/surface/channel_create.c", - "src/core/surface/client.c", "src/core/surface/completion_queue.c", "src/core/surface/event_string.c", "src/core/surface/init.c", diff --git a/Makefile b/Makefile index 283f3c6cd0..7ca182d9de 100644 --- a/Makefile +++ b/Makefile @@ -3083,7 +3083,6 @@ LIBGRPC_SRC = \ src/core/surface/call_log_batch.c \ src/core/surface/channel.c \ src/core/surface/channel_create.c \ - src/core/surface/client.c \ src/core/surface/completion_queue.c \ src/core/surface/event_string.c \ src/core/surface/init.c \ @@ -3341,7 +3340,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/surface/call_log_batch.c \ src/core/surface/channel.c \ src/core/surface/channel_create.c \ - src/core/surface/client.c \ src/core/surface/completion_queue.c \ src/core/surface/event_string.c \ src/core/surface/init.c \ diff --git a/build.json b/build.json index b05f423c83..dc3d2ac1c5 100644 --- a/build.json +++ b/build.json @@ -169,7 +169,6 @@ "src/core/surface/byte_buffer_queue.h", "src/core/surface/call.h", "src/core/surface/channel.h", - "src/core/surface/client.h", "src/core/surface/completion_queue.h", "src/core/surface/event_string.h", "src/core/surface/init.h", @@ -272,7 +271,6 @@ "src/core/surface/call_log_batch.c", "src/core/surface/channel.c", "src/core/surface/channel_create.c", - "src/core/surface/client.c", "src/core/surface/completion_queue.c", "src/core/surface/event_string.c", "src/core/surface/init.c", diff --git a/gRPC.podspec b/gRPC.podspec index f8b5a99bdc..1707ee7eaa 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -210,7 +210,6 @@ Pod::Spec.new do |s| 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', - 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', @@ -342,7 +341,6 @@ Pod::Spec.new do |s| 'src/core/surface/call_log_batch.c', 'src/core/surface/channel.c', 'src/core/surface/channel_create.c', - 'src/core/surface/client.c', 'src/core/surface/completion_queue.c', 'src/core/surface/event_string.c', 'src/core/surface/init.c', @@ -464,7 +462,6 @@ Pod::Spec.new do |s| 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', - 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 6fad077c62..1d30b073ab 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -61,36 +61,21 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void con_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_transport_perform_op(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); + grpc_transport_perform_stream_op(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } -/* Currently we assume all channel operations should just be pushed up. */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { +static void con_start_transport_op(grpc_channel_element *elem, + grpc_transport_op *op) { channel_data *chand = elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - switch (op->type) { - case GRPC_CHANNEL_GOAWAY: - grpc_transport_goaway(chand->transport, op->data.goaway.status, - op->data.goaway.message); - break; - case GRPC_CHANNEL_DISCONNECT: - grpc_transport_close(chand->transport); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_channel_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, op); } /* Constructor for call_data */ @@ -136,61 +121,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "connected", -}; - -/* Transport callback to accept a new stream... calls up to handle it */ -static void accept_stream(void *user_data, grpc_transport *transport, - const void *transport_server_data) { - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_ACCEPT_CALL; - op.dir = GRPC_CALL_UP; - op.data.accept_call.transport = transport; - op.data.accept_call.transport_server_data = transport_server_data; - channel_op(elem, NULL, &op); -} - -static void transport_goaway(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug) { - /* transport got goaway ==> call up and handle it */ - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_TRANSPORT_GOAWAY; - op.dir = GRPC_CALL_UP; - op.data.goaway.status = status; - op.data.goaway.message = debug; - channel_op(elem, NULL, &op); -} - -static void transport_closed(void *user_data, grpc_transport *transport) { - /* transport was closed ==> call up and handle it */ - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_TRANSPORT_CLOSED; - op.dir = GRPC_CALL_UP; - channel_op(elem, NULL, &op); -} - -const grpc_transport_callbacks connected_channel_transport_callbacks = { - accept_stream, transport_goaway, transport_closed, + con_start_transport_stream_op, + con_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "connected", }; grpc_transport_setup_result grpc_connected_channel_bind_transport( @@ -213,6 +152,5 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport( channel_stack->call_stack_size += grpc_transport_stream_size(transport); ret.user_data = elem; - ret.callbacks = &connected_channel_transport_callbacks; return ret; } diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3f10c4fc88..6928a59c38 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -134,23 +134,6 @@ static void hc_start_transport_op(grpc_call_element *elem, grpc_call_next_op(elem, op); } -/* Called on special channel events, such as disconnection or new incoming - calls on the server */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_channel_next_op(elem, op); - break; - } -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, @@ -222,6 +205,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "http-client"}; + hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 6434502bdc..dac53e9bf1 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -72,9 +72,6 @@ typedef struct channel_data { grpc_mdctx *mdctx; } channel_data; -/* used to silence 'variable not used' warnings */ -static void ignore_unused(void *ignored) {} - static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; @@ -216,23 +213,6 @@ static void hs_start_transport_op(grpc_call_element *elem, grpc_call_next_op(elem, op); } -/* Called on special channel events, such as disconnection or new incoming - calls on the server */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_channel_next_op(elem, op); - break; - } -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, @@ -298,6 +278,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "http-server"}; + hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index d472b80744..1478f04a3c 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -62,31 +62,14 @@ static void noop_mutate_op(grpc_call_element *elem, - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void noop_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void noop_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { noop_mutate_op(elem, op); /* pass control down the stack */ grpc_call_next_op(elem, op); } -/* Called on special channel events, such as disconnection or new incoming - calls on the server */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_channel_next_op(elem, op); - break; - } -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, @@ -136,7 +119,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) { ignore_unused(channeld); } -const grpc_channel_filter grpc_no_op_filter = { - noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "no-op"}; +const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "no-op"}; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ddff3efb32..7a8eb8c54f 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1154,7 +1154,7 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; - elem->filter->start_transport_op(elem, op); + elem->filter->start_transport_stream_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a3c4dcebc1..6c4b407a85 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -39,7 +39,6 @@ #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" -#include "src/core/surface/client.h" #include "src/core/surface/init.h" #include #include @@ -238,22 +237,15 @@ void grpc_channel_internal_unref(grpc_channel *channel) { } } -void grpc_channel_destroy(grpc_channel *channel) { - grpc_channel_op op; - grpc_channel_element *elem; - - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - - op.type = GRPC_CHANNEL_GOAWAY; - op.dir = GRPC_CALL_DOWN; - op.data.goaway.status = GRPC_STATUS_OK; - op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect"); - elem->filter->channel_op(elem, NULL, &op); - - op.type = GRPC_CHANNEL_DISCONNECT; - op.dir = GRPC_CALL_DOWN; - elem->filter->channel_op(elem, NULL, &op); +static void execute_op(grpc_channel *channel, grpc_transport_op *op) { + abort(); +} +void grpc_channel_destroy(grpc_channel *channel) { + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.disconnect = 1; + execute_op(channel, &op); GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 8efd86b9f6..14ff63a2e3 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -38,9 +38,7 @@ #include "src/core/channel/client_channel.h" #include "src/core/client_config/resolver_registry.h" -#include "src/core/client_config/subchannels/tcp_subchannel.h" #include "src/core/surface/channel.h" -#include "src/core/surface/client.h" /* Create a client channel: Asynchronously: - resolve target @@ -53,7 +51,6 @@ grpc_channel *grpc_channel_create(const char *target, const grpc_channel_filter *filters[MAX_FILTERS]; grpc_resolver *resolver; int n = 0; - filters[n++] = &grpc_client_surface_filter; /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; @@ -61,7 +58,8 @@ grpc_channel *grpc_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - resolver = grpc_resolver_create(target, grpc_create_tcp_subchannel_factory()); + GPR_ASSERT(!"NULL should be a subchannel factory creation below"); + resolver = grpc_resolver_create(target, NULL); if (!resolver) { return NULL; } diff --git a/src/core/surface/client.c b/src/core/surface/client.c deleted file mode 100644 index 9c9cba5771..0000000000 --- a/src/core/surface/client.c +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/surface/client.h" - -#include "src/core/surface/call.h" -#include "src/core/surface/channel.h" -#include "src/core/support/string.h" -#include -#include - -typedef struct { void *unused; } call_data; - -typedef struct { void *unused; } channel_data; - -static void client_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_call_next_op(elem, op); -} - -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - switch (op->type) { - case GRPC_ACCEPT_CALL: - gpr_log(GPR_ERROR, "Client cannot accept new calls"); - break; - case GRPC_TRANSPORT_CLOSED: - grpc_client_channel_closed(elem); - break; - case GRPC_TRANSPORT_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_channel_next_op(elem, op); - } -} - -static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data, - grpc_transport_stream_op *initial_op) {} - -static void destroy_call_elem(grpc_call_element *elem) {} - -static void init_channel_elem(grpc_channel_element *elem, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { - GPR_ASSERT(is_first); - GPR_ASSERT(!is_last); -} - -static void destroy_channel_elem(grpc_channel_element *elem) {} - -const grpc_channel_filter grpc_client_surface_filter = { - client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "client", -}; diff --git a/src/core/surface/client.h b/src/core/surface/client.h deleted file mode 100644 index 9db2ccf3d2..0000000000 --- a/src/core/surface/client.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_SURFACE_CLIENT_H -#define GRPC_INTERNAL_CORE_SURFACE_CLIENT_H - -#include "src/core/channel/channel_stack.h" - -extern const grpc_channel_filter grpc_client_surface_filter; - -#endif /* GRPC_INTERNAL_CORE_SURFACE_CLIENT_H */ diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 4b55e9dc91..5235d3f7f4 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,16 +49,16 @@ typedef struct { typedef struct { grpc_mdctx *mdctx; } channel_data; -static void lame_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void lame_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->send_ops) { + if (op->send_ops != NULL) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); op->on_done_send->cb(op->on_done_send->cb_arg, 0); } - if (op->recv_ops) { + if (op->recv_ops != NULL) { char tmp[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp); @@ -77,22 +77,21 @@ static void lame_start_transport_op(grpc_call_element *elem, *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } - if (op->on_consumed) { + if (op->on_consumed != NULL) { op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - switch (op->type) { - case GRPC_CHANNEL_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - case GRPC_CHANNEL_DISCONNECT: - grpc_client_channel_closed(elem); - break; - default: - break; +static void lame_start_transport_op(grpc_channel_element *elem, + grpc_transport_op *op) { + if (op->on_connectivity_state_change) { + GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); + *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; + op->on_connectivity_state_change->cb( + op->on_connectivity_state_change->cb_arg, 1); + } + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 1); } } @@ -118,9 +117,15 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - lame_start_transport_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "lame-client", + lame_start_transport_stream_op, + lame_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 607344a7a6..568f7925dd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -115,6 +115,7 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; size_t num_calls; + grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; grpc_mdstr *authority_key; @@ -125,6 +126,7 @@ struct channel_data { gpr_uint32 registered_method_slots; gpr_uint32 registered_method_max_probes; grpc_iomgr_closure finish_destroy_channel_closure; + grpc_iomgr_closure channel_connectivity_changed; }; typedef struct shutdown_tag { @@ -539,13 +541,41 @@ static void server_mutate_op(grpc_call_element *elem, } } -static void server_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void server_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(elem, op); } +static void accept_stream(void *cd, grpc_transport *transport, + const void *transport_server_data) { + channel_data *chand = cd; + /* create a call */ + grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, + gpr_inf_future); +} + +static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { + channel_data *chand = cd; + grpc_server *server = chand->server; + if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.on_connectivity_state_change = &chand->channel_connectivity_changed, + op.connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); + } else { + gpr_mu_lock(&server->mu_global); + destroy_channel(chand); + gpr_mu_unlock(&server->mu_global); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); + } +} + +#if 0 static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; @@ -576,39 +606,45 @@ static void channel_op(grpc_channel_element *elem, break; } } +#endif typedef struct { channel_data *chand; int send_goaway; int send_disconnect; grpc_iomgr_closure finish_shutdown_channel_closure; + + /* for use during shutdown: the goaway message to send */ + gpr_slice goaway_message; } shutdown_channel_args; -static void finish_shutdown_channel(void *p, int success) { +static void destroy_shutdown_channel_args(void *p, int success) { shutdown_channel_args *sca = p; - grpc_channel_op op; - - if (sca->send_goaway) { - op.type = GRPC_CHANNEL_GOAWAY; - op.dir = GRPC_CALL_DOWN; - op.data.goaway.status = GRPC_STATUS_OK; - op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown"); - channel_op(grpc_channel_stack_element( - grpc_channel_get_channel_stack(sca->chand->channel), 0), - NULL, &op); - } - if (sca->send_disconnect) { - op.type = GRPC_CHANNEL_DISCONNECT; - op.dir = GRPC_CALL_DOWN; - channel_op(grpc_channel_stack_element( - grpc_channel_get_channel_stack(sca->chand->channel), 0), - NULL, &op); - } GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown"); - + gpr_slice_unref(sca->goaway_message); gpr_free(sca); } +static void finish_shutdown_channel(void *p, int success) { + shutdown_channel_args *sca = p; + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + + op.send_goaway = sca->send_goaway; + sca->goaway_message = gpr_slice_from_copied_string("Server shutdown"); + op.goaway_message = &sca->goaway_message; + op.goaway_status = GRPC_STATUS_OK; + op.disconnect = sca->send_disconnect; + grpc_iomgr_closure_init(&sca->finish_shutdown_channel_closure, + destroy_shutdown_channel_args, sca); + op.on_consumed = &sca->finish_shutdown_channel_closure; + + grpc_channel_next_op( + grpc_channel_stack_element( + grpc_channel_get_channel_stack(sca->chand->channel), 0), + &op); +} + static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect) { shutdown_channel_args *sca; @@ -687,6 +723,9 @@ static void init_channel_elem(grpc_channel_element *elem, chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; + chand->connectivity_state = GRPC_CHANNEL_IDLE; + grpc_iomgr_closure_init(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand); } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -717,8 +756,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - server_start_transport_op, - channel_op, + server_start_transport_stream_op, + grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, @@ -852,6 +891,7 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_uint32 slots; gpr_uint32 probes; gpr_uint32 max_probes = 0; + grpc_transport_op op; grpc_transport_setup_result result; for (i = 0; i < s->channel_filter_count; i++) { @@ -863,7 +903,9 @@ grpc_transport_setup_result grpc_server_setup_transport( filters[i] = &grpc_connected_channel_filter; for (i = 0; i < s->cq_count; i++) { - grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i])); + memset(&op, 0, sizeof(op)); + op.bind_pollset = grpc_cq_pollset(s->cqs[i]); + grpc_transport_perform_op(transport, &op); } channel = @@ -875,6 +917,14 @@ grpc_transport_setup_result grpc_server_setup_transport( server_ref(s); chand->channel = channel; + GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); + memset(&op, 0, sizeof(op)); + op.set_accept_stream = accept_stream; + op.set_accept_stream_user_data = chand; + op.on_connectivity_state_change = &chand->channel_connectivity_changed; + op.connectivity_state = &chand->connectivity_state; + grpc_transport_perform_op(transport, &op); + num_registered_methods = 0; for (rm = s->registered_methods; rm; rm = rm->next) { num_registered_methods++; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 685098bcba..f2568c01e0 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1038,11 +1038,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), init_stream, perform_op, - add_to_pollset, destroy_stream, - goaway, - close_transport, - send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 40faa27211..a570cba33e 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -38,15 +38,6 @@ size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } -void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, - gpr_slice message) { - transport->vtable->goaway(transport, status, message); -} - -void grpc_transport_close(grpc_transport *transport) { - transport->vtable->close(transport); -} - void grpc_transport_destroy(grpc_transport *transport) { transport->vtable->destroy(transport); } @@ -68,20 +59,11 @@ void grpc_transport_perform_op(grpc_transport *transport, transport->vtable->perform_op(transport, op); } -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset) { - transport->vtable->add_to_pollset(transport, pollset); -} - void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream) { transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb) { - transport->vtable->ping(transport, cb); -} - void grpc_transport_setup_cancel(grpc_transport_setup *setup) { setup->vtable->cancel(setup); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f6a37d048..1acd665a1d 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -85,6 +85,8 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { + /** called when processing of this op is done */ + grpc_iomgr_closure *on_consumed; /** connectivity monitoring */ grpc_iomgr_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index b65b1d5607..8283939050 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -52,7 +52,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_perform_op */ void (*perform_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_op *op); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index b83e227a89..e647434509 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -39,7 +39,6 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_server_filter.h" #include "src/core/surface/channel.h" -#include "src/core/surface/client.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" #include diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 4f00104c02..bc2cfaf6a4 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -845,7 +845,6 @@ src/core/profiling/timers_preciseclock.h \ src/core/surface/byte_buffer_queue.h \ src/core/surface/call.h \ src/core/surface/channel.h \ -src/core/surface/client.h \ src/core/surface/completion_queue.h \ src/core/surface/event_string.h \ src/core/surface/init.h \ @@ -970,7 +969,6 @@ src/core/surface/call_details.c \ src/core/surface/call_log_batch.c \ src/core/surface/channel.c \ src/core/surface/channel_create.c \ -src/core/surface/client.c \ src/core/surface/completion_queue.c \ src/core/surface/event_string.c \ src/core/surface/init.c \ diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj index 0a7da41dbf..6c55c4f834 100644 --- a/vsprojects/grpc/grpc.vcxproj +++ b/vsprojects/grpc/grpc.vcxproj @@ -234,7 +234,6 @@ - @@ -454,8 +453,6 @@ - - diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters index 8d1815c751..d2ada43b6d 100644 --- a/vsprojects/grpc/grpc.vcxproj.filters +++ b/vsprojects/grpc/grpc.vcxproj.filters @@ -280,9 +280,6 @@ src\core\surface - - src\core\surface - src\core\surface @@ -650,9 +647,6 @@ src\core\surface - - src\core\surface - src\core\surface diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj index b96e3092e2..09b3997277 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj @@ -216,7 +216,6 @@ - @@ -392,8 +391,6 @@ - - diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters index dd2846eef1..3b65c46f2b 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -214,9 +214,6 @@ src\core\surface - - src\core\surface - src\core\surface @@ -533,9 +530,6 @@ src\core\surface - - src\core\surface - src\core\surface -- cgit v1.2.3 From 1064f8b97e059e1db3b7d1795748db335909ee42 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 25 Jun 2015 13:52:57 -0700 Subject: Moving chttp2 to new transport interface --- src/core/channel/client_channel.h | 8 -- src/core/channel/connected_channel.c | 8 +- src/core/channel/connected_channel.h | 4 +- src/core/surface/server.c | 15 +-- src/core/surface/server.h | 8 +- src/core/surface/server_chttp2.c | 18 +-- src/core/transport/chttp2/internal.h | 36 ++---- src/core/transport/chttp2_transport.c | 229 +++++++++++++--------------------- src/core/transport/chttp2_transport.h | 9 +- src/core/transport/transport.c | 18 --- src/core/transport/transport.h | 47 ------- src/core/transport/transport_impl.h | 3 +- 12 files changed, 122 insertions(+), 281 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index da02073353..5ab64b9c46 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,12 +52,4 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); -/* grpc_transport_setup_callback for binding new transports into a client - channel - user_data should be the channel stack containing the client - channel */ -grpc_transport_setup_result grpc_client_channel_transport_setup_complete( - grpc_channel_stack *channel_stack, grpc_transport *transport, - grpc_channel_filter const **channel_filters, size_t num_channel_filters, - grpc_mdctx *mdctx); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 1d30b073ab..80a3100af0 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -132,13 +132,12 @@ const grpc_channel_filter grpc_connected_channel_filter = { "connected", }; -grpc_transport_setup_result grpc_connected_channel_bind_transport( - grpc_channel_stack *channel_stack, grpc_transport *transport) { +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport) { /* Assumes that the connected channel filter is always the last filter in a channel stack */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *cd = (channel_data *)elem->channel_data; - grpc_transport_setup_result ret; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(cd->transport == NULL); cd->transport = transport; @@ -150,7 +149,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport( the last call element, and the last call element MUST be the connected channel. */ channel_stack->call_stack_size += grpc_transport_stream_size(transport); - - ret.user_data = elem; - return ret; } diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index 8b35f69b26..d1e2c195cb 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -43,7 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter; /* Post construction fixup: set the transport in the connected channel. Must be called before any call stack using this filter is used. */ -grpc_transport_setup_result grpc_connected_channel_bind_transport( - grpc_channel_stack *channel_stack, grpc_transport *transport); +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 568f7925dd..98e0e81eaa 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -871,10 +871,10 @@ void grpc_server_start(grpc_server *server) { } } -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *s, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args) { +void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); @@ -892,7 +892,6 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_uint32 probes; gpr_uint32 max_probes = 0; grpc_transport_op op; - grpc_transport_setup_result result; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -954,8 +953,8 @@ grpc_transport_setup_result grpc_server_setup_transport( chand->registered_method_max_probes = max_probes; } - result = grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); gpr_mu_lock(&s->mu_global); chand->next = &s->root_channel_data; @@ -964,8 +963,6 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_mu_unlock(&s->mu_global); gpr_free(filters); - - return result; } void grpc_server_shutdown_and_notify(grpc_server *server, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 91a1a2a7f6..2899c6dea3 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -55,10 +55,10 @@ void grpc_server_listener_destroy_done(void *server); /* Setup a transport - creates a channel stack, binds the transport to the server */ -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *server, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args); +void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 7e49a531df..9c02c3ef29 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -42,14 +42,13 @@ #include #include -static grpc_transport_setup_result setup_transport(void *server, - grpc_transport *transport, - grpc_mdctx *mdctx) { +static void setup_transport(void *server, grpc_transport *transport, + grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - return grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(server)); + grpc_server_setup_transport(server, transport, extra_filters, + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(server)); } static void new_transport(void *server, grpc_endpoint *tcp) { @@ -60,9 +59,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_create_chttp2_transport(setup_transport, server, - grpc_server_get_channel_args(server), tcp, NULL, - 0, grpc_mdctx_create(), 0); + grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_transport *transport = grpc_create_chttp2_transport( + grpc_server_get_channel_args(server), tcp, NULL, 0, mdctx, 0); + setup_transport(server, transport, mdctx); } /* Server callback: start listening on our ports */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 02c94744ee..93235aef55 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -134,12 +134,6 @@ typedef struct { grpc_chttp2_stream *prev; } grpc_chttp2_stream_link; -typedef enum { - GRPC_CHTTP2_ERROR_STATE_NONE, - GRPC_CHTTP2_ERROR_STATE_SEEN, - GRPC_CHTTP2_ERROR_STATE_NOTIFIED -} grpc_chttp2_error_state; - /* We keep several sets of connection wide parameters */ typedef enum { /* The settings our peer has asked for (and we have acked) */ @@ -174,6 +168,9 @@ typedef struct { /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; + /** have we seen a goaway */ + gpr_uint8 seen_goaway; + /** is this transport a client? */ gpr_uint8 is_client; /** are the local settings dirty and need to be sent? */ @@ -185,10 +182,6 @@ typedef struct { /** settings values */ gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - /** has there been a connection level error, and have we notified - anyone about it? */ - grpc_chttp2_error_state error_state; - /** what is the next stream id to be allocated by this peer? copied to next_stream_id in parsing when parsing commences */ gpr_uint32 next_stream_id; @@ -204,13 +197,6 @@ typedef struct { /** concurrent stream count: updated when not parsing, so this is a strict over-estimation on the client */ gpr_uint32 concurrent_stream_count; - - /** is there a goaway available? (boolean) */ - grpc_chttp2_error_state goaway_state; - /** what is the debug text of the goaway? */ - gpr_slice goaway_text; - /** what is the status code of the goaway? */ - grpc_status_code goaway_error; } grpc_chttp2_transport_global; typedef struct { @@ -343,14 +329,14 @@ struct grpc_chttp2_transport { grpc_chttp2_stream **accepting_stream; struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; - /** transport channel-level callback */ - const grpc_transport_callbacks *cb; - /** user data for cb calls */ - void *cb_user_data; - /** closure for notifying transport closure */ - grpc_iomgr_closure notify_closed; + /* accept stream callback */ + void (*accept_stream)(void *user_data, grpc_transport *transport, + const void *server_data); + void *accept_stream_user_data; + + /** connectivity tracking */ + grpc_iomgr_closure *on_connectivity_changed; + grpc_connectivity_state *connectivity; } channel_callback; }; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6e61af6f19..11dd60bbb9 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -87,7 +87,6 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); static void reading_action(void *t, int iomgr_success_ignored); -static void notify_closed(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, @@ -101,9 +100,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void drop_connection(grpc_chttp2_transport *t); /** Perform a transport_op */ -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_stream_op *op); +static void perform_stream_op_locked( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, @@ -198,13 +197,11 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } #endif static void init_transport(grpc_chttp2_transport *t, - grpc_transport_setup_callback setup, void *arg, const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_mdctx *mdctx, int is_client) { size_t i; int j; - grpc_transport_setup_result sr; GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); @@ -219,7 +216,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; t->endpoint_reading = 1; - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE; t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; t->global.outgoing_window = DEFAULT_WINDOW; @@ -245,7 +241,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); - grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t); if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -312,23 +307,8 @@ static void init_transport(grpc_chttp2_transport *t, } } - gpr_mu_lock(&t->mu); - t->channel_callback.executing = 1; - REF_TRANSPORT(t, "init"); /* matches unref at end of this function */ - gpr_mu_unlock(&t->mu); - - sr = setup(arg, &t->base, t->metadata_context); - - lock(t); - t->channel_callback.cb = sr.callbacks; - t->channel_callback.cb_user_data = sr.user_data; - t->channel_callback.executing = 0; - unlock(t); - REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); - - UNREF_TRANSPORT(t, "init"); } static void destroy_transport(grpc_transport *gt) { @@ -351,23 +331,6 @@ static void close_transport_locked(grpc_chttp2_transport *t) { } } -static void close_transport(grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - gpr_mu_lock(&t->mu); - close_transport_locked(t); - gpr_mu_unlock(&t->mu); -} - -static void goaway(grpc_transport *gt, grpc_status_code status, - gpr_slice debug_data) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, - grpc_chttp2_grpc_status_to_http2_error(status), - debug_data, &t->global.qbuf); - unlock(t); -} - static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data, grpc_transport_stream_op *initial_op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; @@ -399,7 +362,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.in_stream_map = 1; } - if (initial_op) perform_op_locked(&t->global, &s->global, initial_op); + if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op); unlock(t); return 0; @@ -454,8 +417,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, - &t->base, (void *)(gpr_uintptr)id); + t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + &t->base, (void *)(gpr_uintptr)id); t->accepting_stream = NULL; return &accepting->parsing; } @@ -476,7 +439,7 @@ static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; unlock_check_read_write_state(t); - if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && + if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); @@ -553,14 +516,10 @@ void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, gpr_slice goaway_text) { char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg); gpr_free(msg); - if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_SEEN; - transport_global->goaway_text = goaway_text; - transport_global->goaway_error = goaway_error; - } else { - gpr_slice_unref(goaway_text); - } + gpr_slice_unref(goaway_text); + transport_global->seen_goaway = 1; } static void maybe_start_some_streams( @@ -613,9 +572,9 @@ static void maybe_start_some_streams( } } -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_stream_op *op) { +static void perform_stream_op_locked( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -672,21 +631,19 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, } } -static void perform_op(grpc_transport *gt, grpc_stream *gs, - grpc_transport_stream_op *op) { +static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, + grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; lock(t); - perform_op_locked(&t->global, &s->global, op); + perform_stream_op_locked(&t->global, &s->global, op); unlock(t); } -static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void send_ping_locked(grpc_chttp2_transport *t, + grpc_iomgr_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); - - lock(t); p->next = &t->global.pings; p->prev = p->next->prev; p->prev->next = p->next->prev = p; @@ -700,6 +657,49 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { p->id[7] = t->global.ping_counter & 0xff; p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); +} + +static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + if (op->on_consumed) { + grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1); + } + + if (op->on_connectivity_state_change) { + GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL); + t->channel_callback.on_connectivity_changed = + op->on_connectivity_state_change; + t->channel_callback.connectivity = op->connectivity_state; + } + + if (op->send_goaway) { + grpc_chttp2_goaway_append( + t->global.last_incoming_stream_id, + grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + *op->goaway_message, &t->global.qbuf); + } + + if (op->set_accept_stream != NULL) { + t->channel_callback.accept_stream = op->set_accept_stream; + t->channel_callback.accept_stream_user_data = + op->set_accept_stream_user_data; + } + + if (op->bind_pollset) { + add_to_pollset_locked(t, op->bind_pollset); + } + + if (op->send_ping) { + send_ping_locked(t, op->send_ping); + } + + if (op->disconnect) { + close_transport_locked(t); + } + unlock(t); } @@ -839,9 +839,6 @@ static void end_all_the_calls(grpc_chttp2_transport *t) { } static void drop_connection(grpc_chttp2_transport *t) { - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; - } close_transport_locked(t); end_all_the_calls(t); } @@ -886,7 +883,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, lock(t); i = 0; GPR_ASSERT(!t->parsing_active); - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + if (!t->closed) { t->parsing_active = 1; /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, @@ -931,67 +928,21 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ -typedef struct { - grpc_chttp2_transport *t; - gpr_uint32 error; - gpr_slice text; - grpc_iomgr_closure closure; -} notify_goaways_args; - -static void notify_goaways(void *p, int iomgr_success_ignored) { - notify_goaways_args *a = p; - grpc_chttp2_transport *t = a->t; - - t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base, - a->error, a->text); - - gpr_free(a); - - lock(t); - t->channel_callback.executing = 0; - unlock(t); - - UNREF_TRANSPORT(t, "notify_goaways"); -} - -static void notify_closed(void *gt, int iomgr_success_ignored) { - grpc_chttp2_transport *t = gt; - t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base); - - lock(t); - t->channel_callback.executing = 0; - unlock(t); - - UNREF_TRANSPORT(t, "notify_closed"); -} - static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { - if (t->channel_callback.executing) { - return; - } - if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) { - if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && - t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { - notify_goaways_args *a = gpr_malloc(sizeof(*a)); - a->t = t; - a->error = t->global.goaway_error; - a->text = t->global.goaway_text; - t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; - grpc_iomgr_closure_init(&a->closure, notify_goaways, a); - REF_TRANSPORT(t, "notify_goaways"); - grpc_chttp2_schedule_closure(&t->global, &a->closure, 1); - return; - } else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { - return; + if (t->channel_callback.on_connectivity_changed != NULL) { + grpc_connectivity_state current; + if (t->closed || t->global.seen_goaway) { + current = GRPC_CHANNEL_FATAL_FAILURE; + } else { + current = GRPC_CHANNEL_READY; + } + if (current != *t->channel_callback.connectivity) { + *t->channel_callback.connectivity = current; + grpc_chttp2_schedule_closure( + &t->global, t->channel_callback.on_connectivity_changed, 1); + t->channel_callback.on_connectivity_changed = NULL; + t->channel_callback.connectivity = NULL; } - } - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) { - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; - REF_TRANSPORT(t, "notify_closed"); - grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, - 1); } } @@ -1014,13 +965,6 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, } } -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - add_to_pollset_locked(t, pollset); - unlock(t); -} - /* * TRACING */ @@ -1056,19 +1000,14 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), - init_stream, - perform_op, - destroy_stream, - destroy_transport}; - -void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, - void *arg, - const grpc_channel_args *channel_args, - grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *mdctx, - int is_client) { +static const grpc_transport_vtable vtable = { + sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport}; + +grpc_transport *grpc_create_chttp2_transport( + const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_mdctx *mdctx, int is_client) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx, - is_client); + init_transport(t, channel_args, ep, slices, nslices, mdctx, is_client); + return &t->base; } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index 18e19f03af..1747792b95 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -40,11 +40,8 @@ extern int grpc_http_trace; extern int grpc_flowctl_trace; -void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, - void *arg, - const grpc_channel_args *channel_args, - grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *metadata_context, - int is_client); +grpc_transport *grpc_create_chttp2_transport( + const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_mdctx *metadata_context, int is_client); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a570cba33e..c29217599e 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -64,24 +64,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_setup_cancel(grpc_transport_setup *setup) { - setup->vtable->cancel(setup); -} - -void grpc_transport_setup_initiate(grpc_transport_setup *setup) { - setup->vtable->initiate(setup); -} - -void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset) { - setup->vtable->add_interested_party(setup, pollset); -} - -void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset) { - setup->vtable->del_interested_party(setup, pollset); -} - void grpc_transport_stream_op_finish_with_failure( grpc_transport_stream_op *op) { if (op->send_ops) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 1acd665a1d..24a02132e9 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -174,51 +174,4 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport); -/* Return type for grpc_transport_setup_callback */ -typedef struct grpc_transport_setup_result { - void *user_data; -} grpc_transport_setup_result; - -/* Given a transport, return callbacks for that transport. Used to finalize - setup as a transport is being created */ -typedef grpc_transport_setup_result (*grpc_transport_setup_callback)( - void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx); - -typedef struct grpc_transport_setup grpc_transport_setup; -typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable; - -struct grpc_transport_setup_vtable { - void (*initiate)(grpc_transport_setup *setup); - void (*add_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*del_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*cancel)(grpc_transport_setup *setup); -}; - -/* Transport setup is an asynchronous utility interface for client channels to - establish connections. It's transport agnostic. */ -struct grpc_transport_setup { - const grpc_transport_setup_vtable *vtable; -}; - -/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name - given at transport construction time, create the tcp connection, perform - handshakes, and call some grpc_transport_setup_result function provided at - setup construction time. - This *may* be implemented as a no-op if the setup process monitors something - continuously. */ -void grpc_transport_setup_initiate(grpc_transport_setup *setup); - -void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); -void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); - -/* Cancel transport setup. After this returns, no new transports should be - created, and all pending transport setup callbacks should be completed. - After this call completes, setup should be considered invalid (this can be - used as a destruction call by setup). */ -void grpc_transport_setup_cancel(grpc_transport_setup *setup); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 8283939050..b18f957009 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -51,8 +51,7 @@ typedef struct grpc_transport_vtable { grpc_transport_stream_op *op); /* implementation of grpc_transport_perform_op */ - void (*perform_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_op *op); + void (*perform_op)(grpc_transport *self, grpc_transport_op *op); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); -- cgit v1.2.3 From 079a11bb9b253e91c89e625950ea09879d2d6f8e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Jun 2015 10:07:15 -0700 Subject: clang-format affected files --- include/grpc/grpc.h | 4 +- src/core/channel/census_filter.c | 24 ++++++-- src/core/channel/channel_args.c | 4 +- src/core/channel/channel_args.h | 9 +-- src/core/channel/channel_stack.c | 8 ++- src/core/channel/channel_stack.h | 8 +-- src/core/channel/client_channel.c | 67 ++++++++++++++-------- src/core/channel/client_channel.h | 2 +- src/core/channel/connected_channel.c | 2 +- src/core/channel/connected_channel.h | 2 +- src/core/channel/http_client_filter.c | 2 +- src/core/channel/http_server_filter.c | 2 +- src/core/channel/noop_filter.c | 2 +- src/core/client_config/resolver.c | 12 ++-- src/core/client_config/resolver.h | 9 ++- .../client_config/resolvers/unix_resolver_posix.c | 2 +- src/core/client_config/subchannel.c | 58 ++++++++++++------- src/core/client_config/subchannel.h | 27 ++++++--- src/core/client_config/uri_parser.c | 3 +- src/core/iomgr/fd_posix.c | 3 +- src/core/security/client_auth_filter.c | 4 +- src/core/security/server_auth_filter.c | 9 ++- src/core/security/server_secure_chttp2.c | 12 ++-- src/core/surface/call.c | 17 +++--- src/core/surface/channel.c | 4 +- src/core/surface/channel_create.c | 29 ++++++---- src/core/surface/lame_client.c | 2 +- src/core/surface/secure_channel_create.c | 29 ++++++---- src/core/surface/server.c | 23 ++++---- src/core/transport/chttp2_transport.c | 58 ++++++++++--------- src/core/transport/connectivity_state.c | 6 +- src/core/transport/connectivity_state.h | 3 +- src/core/transport/transport.c | 5 +- src/core/transport/transport.h | 11 ++-- src/core/transport/transport_impl.h | 2 +- test/core/bad_client/bad_client.c | 11 ++-- test/core/channel/channel_stack_test.c | 7 ++- test/core/end2end/fixtures/chttp2_socket_pair.c | 24 ++++---- .../chttp2_socket_pair_one_byte_at_a_time.c | 24 ++++---- .../fixtures/chttp2_socket_pair_with_grpc_trace.c | 24 ++++---- test/core/end2end/multiple_server_queues_test.c | 2 +- test/core/iomgr/fd_conservation_posix_test.c | 12 ++-- 42 files changed, 325 insertions(+), 243 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 147343049b..782923d599 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -441,7 +441,7 @@ void grpc_channel_destroy(grpc_channel *channel); has been made. */ /* Called by clients to cancel an RPC on the server. - Can be called multiple times, from any thread. + Can be called multiple times, from any thread. THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status are thread-safe, and can be called at any point before grpc_call_destroy is called.*/ @@ -457,7 +457,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description); -/* Destroy a call. +/* Destroy a call. THREAD SAFETY: grpc_call_destroy is thread-compatible */ void grpc_call_destroy(grpc_call *call); diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index ea0bece587..83b7682848 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -202,11 +202,23 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_start_transport_op, channel_op, sizeof(call_data), - client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-client"}; + client_start_transport_op, + channel_op, + sizeof(call_data), + client_init_call_elem, + client_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_start_transport_op, channel_op, sizeof(call_data), - server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-server"}; + server_start_transport_op, + channel_op, + sizeof(call_data), + server_init_call_elem, + server_destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "census-server"}; diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 371da4210e..140f8bd656 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -139,8 +139,8 @@ grpc_compression_level grpc_channel_args_get_compression_level( return GRPC_COMPRESS_LEVEL_NONE; } -void grpc_channel_args_set_compression_level( - grpc_channel_args **a, grpc_compression_level level) { +void grpc_channel_args_set_compression_level(grpc_channel_args **a, + grpc_compression_level level) { grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; tmp.key = GRPC_COMPRESSION_LEVEL_ARG; diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index 27ad57b3e8..17849b7e59 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -47,7 +47,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, size_t num_to_add); /** Copy args from a then args from b into a new channel args */ -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); +grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, + const grpc_channel_args *b); /** Destroy arguments created by grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); @@ -62,7 +63,7 @@ grpc_compression_level grpc_channel_args_get_compression_level( /** Sets the compression level in \a a to \a level. Setting it to * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */ -void grpc_channel_args_set_compression_level( - grpc_channel_args **a, grpc_compression_level level); +void grpc_channel_args_set_compression_level(grpc_channel_args **a, + grpc_compression_level level); -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 0810a61cd0..e38dcb58b7 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -102,7 +102,8 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, } void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, const grpc_channel_args *args, + size_t filter_count, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *metadata_context, grpc_channel_stack *stack) { size_t call_size = @@ -122,8 +123,9 @@ void grpc_channel_stack_init(const grpc_channel_filter **filters, for (i = 0; i < filter_count; i++) { elems[i].filter = filters[i]; elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem(&elems[i], master, args, metadata_context, - i == 0, i == (filter_count - 1)); + elems[i].filter->init_channel_elem(&elems[i], master, args, + metadata_context, i == 0, + i == (filter_count - 1)); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 6db98815df..785be8925b 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -65,7 +65,7 @@ typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ void (*start_transport_stream_op)(grpc_call_element *elem, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -96,8 +96,7 @@ typedef struct { is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ - void (*init_channel_elem)(grpc_channel_element *elem, - grpc_channel *master, + void (*init_channel_elem)(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last); @@ -152,7 +151,8 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master,const grpc_channel_args *args, + size_t filter_count, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *metadata_context, grpc_channel_stack *stack); /* Destroy a channel stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index cb36fdc029..6815ec8718 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -138,7 +138,9 @@ typedef struct { grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation); +static void perform_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation); static void continue_with_pick(void *arg, int iomgr_success) { waiting_call *wc = arg; @@ -147,7 +149,8 @@ static void continue_with_pick(void *arg, int iomgr_success) { gpr_free(wc); } -static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) { +static void add_to_lb_policy_wait_queue_locked_state_config( + grpc_call_element *elem) { channel_data *chand = elem->channel_data; waiting_call *wc = gpr_malloc(sizeof(*wc)); grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); @@ -182,7 +185,8 @@ static void started_call(void *arg, int iomgr_success) { calld->state = CALL_ACTIVE; gpr_mu_unlock(&calld->mu_state); if (have_waiting) { - grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op); + grpc_subchannel_call_process_op(calld->subchannel_call, + &calld->waiting_op); } } else { calld->state = CALL_CANCELLED; @@ -233,17 +237,16 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { initial_metadata = &op->send_ops->ops[0].data.metadata; grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick(lb_policy, op->bind_pollset, - initial_metadata, &calld->picked_channel, &calld->async_setup_task); + grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata, + &calld->picked_channel, &calld->async_setup_task); } -static void merge_into_waiting_op(grpc_call_element *elem, grpc_transport_stream_op *new_op) { +static void merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT((waiting_op->send_ops == NULL) != - (new_op->send_ops == NULL)); - GPR_ASSERT((waiting_op->recv_ops == NULL) != - (new_op->recv_ops == NULL)); + GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL)); + GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL)); if (new_op->send_ops != NULL) { waiting_op->send_ops = new_op->send_ops; waiting_op->is_last_send = new_op->is_last_send; @@ -263,7 +266,9 @@ static void merge_into_waiting_op(grpc_call_element *elem, grpc_transport_stream } } -static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { +static void perform_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; @@ -311,7 +316,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_ } break; } - /* fall through */ + /* fall through */ case CALL_CREATED: if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; @@ -343,7 +348,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_ } static void cc_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op) { perform_transport_stream_op(elem, op, 0); } @@ -382,12 +387,14 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { GRPC_RESOLVER_REF(resolver, "channel-next"); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); + grpc_resolver_next(chand->resolver, &chand->incoming_configuration, + &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); } else { old_resolver = chand->resolver; chand->resolver = NULL; - grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); + grpc_connectivity_state_set(&chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); gpr_mu_unlock(&chand->mu_config); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); @@ -404,7 +411,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } -static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { +static void cc_start_transport_op(grpc_channel_element *elem, + grpc_transport_op *op) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; @@ -416,13 +424,16 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); + grpc_connectivity_state_notify_on_state_change( + &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } if (op->disconnect && chand->resolver != NULL) { - grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); + grpc_connectivity_state_set(&chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); destroy_resolver = chand->resolver; chand->resolver = NULL; } @@ -496,7 +507,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -510,7 +521,8 @@ static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; - grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, + chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE); } @@ -530,9 +542,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "client-channel", }; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, @@ -544,5 +562,6 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, chand->resolver = resolver; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); GRPC_RESOLVER_REF(resolver, "channel"); - grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); + grpc_resolver_next(resolver, &chand->incoming_configuration, + &chand->on_config_changed); } diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5ab64b9c46..fd2be46145 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,4 +52,4 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 99c8a643f6..34d07de519 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -103,7 +103,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index d1e2c195cb..b615b0d350 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -46,4 +46,4 @@ extern const grpc_channel_filter grpc_connected_channel_filter; void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, grpc_transport *transport); -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3d1fc6a020..581eb13f58 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -170,7 +170,7 @@ static const char *scheme_from_args(const grpc_channel_args *args) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 3b1128bef9..db0bf590c6 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -229,7 +229,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) {} /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 0d9c2e82a8..5117723617 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -95,7 +95,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index bbc0ec4e81..91e42bb684 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -34,16 +34,17 @@ #include "src/core/client_config/resolver.h" void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable) { + const grpc_resolver_vtable *vtable) { resolver->vtable = vtable; gpr_ref_init(&resolver->refs, 1); } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line, - const char *reason) { + const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", - resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, reason); + resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, + reason); #else void grpc_resolver_ref(grpc_resolver *resolver) { #endif @@ -52,9 +53,10 @@ void grpc_resolver_ref(grpc_resolver *resolver) { #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line, - const char *reason) { + const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", - resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, reason); + resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, + reason); #else void grpc_resolver_unref(grpc_resolver *resolver) { #endif diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 16b5964eb6..8ad87d789b 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -59,14 +59,13 @@ struct grpc_resolver_vtable { }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -#define GRPC_RESOLVER_REF(p, r) \ - grpc_resolver_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r)) #define GRPC_RESOLVER_UNREF(p, r) \ grpc_resolver_unref((p), __FILE__, __LINE__, (r)) void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line, - const char *reason); + const char *reason); void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line, - const char *reason); + const char *reason); #else #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p)) #define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p)) @@ -75,7 +74,7 @@ void grpc_resolver_unref(grpc_resolver *policy); #endif void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable); + const grpc_resolver_vtable *vtable); void grpc_resolver_shutdown(grpc_resolver *resolver); diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c index 7f2008685c..be515d2689 100644 --- a/src/core/client_config/resolvers/unix_resolver_posix.c +++ b/src/core/client_config/resolvers/unix_resolver_posix.c @@ -136,7 +136,7 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) { } static void unix_destroy(grpc_resolver *gr) { - unix_resolver *r = (unix_resolver*)gr; + unix_resolver *r = (unix_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d16786a7ad..cde14b9222 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -134,21 +134,31 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); -static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; +static void subchannel_ref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static int subchannel_unref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) - GRPC_MUST_USE_RESULT; +static grpc_subchannel *connection_unref_locked( + connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_subchannel *c); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p), __FILE__, __LINE__, (r)) -#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p), __FILE__, __LINE__, (r)) -#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r)) -#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r)) +#define SUBCHANNEL_REF_LOCKED(p, r) \ + subchannel_ref_locked((p), __FILE__, __LINE__, (r)) +#define SUBCHANNEL_UNREF_LOCKED(p, r) \ + subchannel_unref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_REF_LOCKED(p, r) \ + connection_ref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_UNREF_LOCKED(p, r) \ + connection_unref_locked((p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason -#define REF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", (name), (p), (p)->refs, (p)->refs + 1, reason) -#define UNREF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", (name), (p), (p)->refs, (p)->refs - 1, reason) +#define REF_LOG(name, p) \ + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ + (name), (p), (p)->refs, (p)->refs + 1, reason) +#define UNREF_LOG(name, p) \ + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ + (name), (p), (p)->refs, (p)->refs - 1, reason) #else #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) @@ -174,13 +184,15 @@ static void connection_destroy(connection *c) { gpr_free(c); } -static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static void connection_ref_locked( + connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("CONNECTION", c); subchannel_ref_locked(c->subchannel REF_PASS_ARGS); ++c->refs; } -static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static grpc_subchannel *connection_unref_locked( + connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *destroy = NULL; UNREF_LOG("CONNECTION", c); if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { @@ -196,12 +208,14 @@ static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_RE * grpc_subchannel implementation */ -static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static void subchannel_ref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("SUBCHANNEL", c); - ++c->refs; + ++c->refs; } -static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static int subchannel_unref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { UNREF_LOG("SUBCHANNEL", c); return --c->refs == 0; } @@ -385,7 +399,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, if (con != NULL) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); + grpc_channel_element *top_elem = + grpc_channel_stack_element(channel_stack, 0); top_elem->filter->start_transport_op(top_elem, op); gpr_mu_lock(&c->mu); @@ -485,7 +500,8 @@ static void publish_transport(grpc_subchannel *c) { stk = (grpc_channel_stack *)(con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, stk); + grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, + stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free(c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -601,11 +617,13 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) { * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_ref(&c->refs); } -void grpc_subchannel_call_unref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_unref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (gpr_unref(&c->refs)) { gpr_mu *mu = &c->connection->subchannel->mu; grpc_subchannel *destroy; diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 5435ef703b..a23a623277 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -44,11 +44,16 @@ typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS , const char *file, int line, const char *reason +#define GRPC_SUBCHANNEL_REF(p, r) \ + grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(p, r) \ + grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_REF(p, r) \ + grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ + grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ + , const char *file, int line, const char *reason #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) @@ -57,10 +62,14 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_unref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_unref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_ref( + grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref( + grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref( + grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref( + grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index c5faab5eba..776a255923 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -39,7 +39,8 @@ #include #include -static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, int suppress_errors) { +static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, + int suppress_errors) { char *line_prefix; int pfx_len; diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 446081954d..afecccae17 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -199,7 +199,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } static int has_watchers(grpc_fd *fd) { - return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root; + return fd->read_watcher != NULL || fd->write_watcher != NULL || + fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 93bf846978..5d04ec49b2 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -280,7 +280,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -326,6 +326,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), + auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client-auth"}; diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index a92b46c85f..a0dbeb58cd 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -84,8 +84,7 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { -} +static void destroy_call_elem(grpc_call_element *elem) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, @@ -115,6 +114,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "server-auth"}; + auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "server-auth"}; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 018ec3d1d7..2e49e370f7 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -75,9 +75,8 @@ static void state_unref(grpc_server_secure_state *state) { } } -static void setup_transport(void *statep, - grpc_transport *transport, - grpc_mdctx *mdctx) { +static void setup_transport(void *statep, grpc_transport *transport, + grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { &grpc_server_auth_filter, &grpc_http_server_filter}; grpc_server_secure_state *state = statep; @@ -85,8 +84,7 @@ static void setup_transport(void *statep, grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( grpc_server_get_channel_args(state->server), &connector_arg, 1); grpc_server_setup_transport(state->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - args_copy); + GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy); grpc_channel_args_destroy(args_copy); } @@ -101,8 +99,8 @@ static void on_secure_transport_setup_done(void *statep, if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(state->server), - secure_endpoint, NULL, 0, mdctx, 0); + grpc_server_get_channel_args(state->server), secure_endpoint, NULL, 0, + mdctx, 0); setup_transport(state, transport, mdctx); } else { /* We need to consume this here, because the server may already have gone diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 84ae038e46..8550056bcb 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -462,8 +462,7 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || - call->destroy_called; + (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; } static void unlock(grpc_call *call) { @@ -1151,7 +1150,8 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, + args); op->on_consumed = &args->closure; } } @@ -1223,13 +1223,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { } else { gpr_uint32 parsed_clevel_bytes; if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { + GPR_SLICE_LENGTH(md->value->slice), + &parsed_clevel_bytes)) { /* the following cast is safe, as a gpr_uint32 should be able to hold all * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level) parsed_clevel_bytes; + clevel = (grpc_compression_level)parsed_clevel_bytes; } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ } grpc_mdelem_set_user_data(md, destroy_compression, (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); @@ -1252,7 +1252,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + } else if (key == + grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e85eaf2c05..f8151c121c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -129,7 +129,8 @@ grpc_channel *grpc_channel_create_from_filters( } } - grpc_channel_stack_init(filters, num_filters, channel, args, channel->metadata_context, + grpc_channel_stack_init(filters, num_filters, channel, args, + channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; @@ -264,7 +265,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { return channel->grpc_compression_level_string; } - grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return grpc_mdelem_ref(channel->grpc_status_elem[i]); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 20da830388..280927834b 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -71,10 +71,10 @@ static void connected(void *arg, grpc_endpoint *tcp) { connector *c = arg; grpc_iomgr_closure *notify; if (tcp != NULL) { - c->result->transport = - grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1); + c->result->transport = grpc_create_chttp2_transport( + c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1); GPR_ASSERT(c->result->transport); - c->result->filters = gpr_malloc(sizeof(grpc_channel_filter*)); + c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; c->result->num_filters = 1; } else { @@ -85,19 +85,22 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_add_callback(notify); } -static void connector_connect( - grpc_connector *con, const grpc_connect_in_args *args, - grpc_connect_out_args *result, grpc_iomgr_closure *notify) { +static void connector_connect(grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_iomgr_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, + args->addr_len, args->deadline); } -static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect}; +static const grpc_connector_vtable connector_vtable = { + connector_ref, connector_unref, connector_connect}; typedef struct { grpc_subchannel_factory base; @@ -119,7 +122,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { } } -static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) { +static grpc_subchannel *subchannel_factory_create_subchannel( + grpc_subchannel_factory *scf, grpc_subchannel_args *args) { subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_channel_args *final_args = @@ -136,7 +140,9 @@ static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_fac return s; } -static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel}; +static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { + subchannel_factory_ref, subchannel_factory_unref, + subchannel_factory_create_subchannel}; /* Create a client channel: Asynchronously: - resolve target @@ -170,7 +176,8 @@ grpc_channel *grpc_channel_create(const char *target, } channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); - grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), + resolver); GRPC_RESOLVER_UNREF(resolver, "create"); grpc_subchannel_factory_unref(&f->base); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index c6ac679871..3dd56fe5a9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -105,7 +105,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) {} -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *chand = elem->channel_data; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d355079260..998acfc8cf 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -82,9 +82,9 @@ static void on_secure_transport_setup_done(void *arg, gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); memset(c->result, 0, sizeof(*c->result)); } else { - c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, secure_endpoint, - NULL, 0, c->args.metadata_context, 1); + c->result->transport = + grpc_create_chttp2_transport(c->args.channel_args, secure_endpoint, + NULL, 0, c->args.metadata_context, 1); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_client_auth_filter; c->result->filters[1] = &grpc_http_client_filter; @@ -109,19 +109,22 @@ static void connected(void *arg, grpc_endpoint *tcp) { } } -static void connector_connect( - grpc_connector *con, const grpc_connect_in_args *args, - grpc_connect_out_args *result, grpc_iomgr_closure *notify) { +static void connector_connect(grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_iomgr_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, + args->addr_len, args->deadline); } -static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect}; +static const grpc_connector_vtable connector_vtable = { + connector_ref, connector_unref, connector_connect}; typedef struct { grpc_subchannel_factory base; @@ -144,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { } } -static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) { +static grpc_subchannel *subchannel_factory_create_subchannel( + grpc_subchannel_factory *scf, grpc_subchannel_args *args) { subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_channel_args *final_args = @@ -162,7 +166,9 @@ static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_fac return s; } -static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel}; +static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { + subchannel_factory_ref, subchannel_factory_unref, + subchannel_factory_create_subchannel}; /* Create a secure client channel: Asynchronously: - resolve target @@ -219,7 +225,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, } channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); - grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), + resolver); GRPC_RESOLVER_UNREF(resolver, "create"); grpc_channel_args_destroy(args_copy); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 703eeaf2bd..c883d08a02 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -151,7 +151,7 @@ struct grpc_server { before mu_call. This is currently used in shutdown processing (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ gpr_mu mu_global; /* mutex for server and channel state */ - gpr_mu mu_call; /* mutex for call-specific state */ + gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; requested_call_array requested_calls; @@ -226,11 +226,10 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; size_t dc_count = 0; - for (c = s->root_channel_data.next; c != &s->root_channel_data; - c = c->next) { - count ++; + for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { + count++; if (c->num_calls == 0) { - dc_count ++; + dc_count++; } } cb->num_channels = count; @@ -239,8 +238,7 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; dc_count = 0; - for (c = s->root_channel_data.next; c != &s->root_channel_data; - c = c->next) { + for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); if (c->num_calls == 0) { @@ -261,7 +259,8 @@ static void shutdown_cleanup(void *arg, int iomgr_status_ignored) { gpr_free(a); } -static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disconnect) { +static void send_shutdown(grpc_channel *channel, int send_goaway, + int send_disconnect) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -277,12 +276,12 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disco grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc); op.on_consumed = &sc->closure; - elem = grpc_channel_stack_element( - grpc_channel_get_channel_stack(channel), 0); + elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem->filter->start_transport_op(elem, &op); } -static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int send_disconnect) { +static void channel_broadcaster_shutdown(channel_broadcaster *cb, + int send_goaway, int send_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -721,7 +720,7 @@ static void destroy_call_elem(grpc_call_element *elem) { server_unref(chand->server); } -static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index e32071e692..d8001d6c32 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -117,7 +117,9 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global); -static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state); +static void connectivity_state_set( + grpc_chttp2_transport_global *transport_global, + grpc_connectivity_state state); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -330,8 +332,7 @@ static void destroy_transport(grpc_transport *gt) { static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; - connectivity_state_set(&t->global, - GRPC_CHANNEL_FATAL_FAILURE); + connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE); if (t->ep) { grpc_endpoint_shutdown(t->ep); } @@ -339,7 +340,8 @@ static void close_transport_locked(grpc_chttp2_transport *t) { } static int init_stream(grpc_transport *gt, grpc_stream *gs, - const void *server_data, grpc_transport_stream_op *initial_op) { + const void *server_data, + grpc_transport_stream_op *initial_op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -521,14 +523,13 @@ static void writing_action(void *gt, int iomgr_success_ignored) { void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, gpr_slice goaway_text) { - char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); + char *msg = gpr_hexdump((char *)GPR_SLICE_START_PTR(goaway_text), + GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg); gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; - connectivity_state_set( - transport_global, - GRPC_CHANNEL_FATAL_FAILURE); + connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE); } static void maybe_start_some_streams( @@ -735,9 +736,8 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { grpc_chttp2_parsing_become_skip_parser(&t->parsing); } - new_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map) + - grpc_chttp2_stream_map_size(&t->new_stream_map); + new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + + grpc_chttp2_stream_map_size(&t->new_stream_map); if (new_stream_count != t->global.concurrent_stream_count) { t->global.concurrent_stream_count = new_stream_count; maybe_start_some_streams(&t->global); @@ -772,11 +772,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (!stream_global->published_cancelled) { char buffer[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(stream_global->cancelled_status, buffer); - grpc_chttp2_incoming_metadata_buffer_add(&stream_global->incoming_metadata, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->incoming_metadata, + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", + buffer)); grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_global->incoming_metadata, - &stream_global->incoming_sopb); + &stream_global->incoming_metadata, &stream_global->incoming_sopb); stream_global->published_cancelled = 1; } } @@ -825,10 +826,10 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, stream_global->cancelled = 1; stream_global->cancelled_status = status; if (stream_global->id != 0) { - gpr_slice_buffer_add(&transport_global->qbuf, - grpc_chttp2_rst_stream_create( - stream_global->id, - grpc_chttp2_grpc_status_to_http2_error(status))); + gpr_slice_buffer_add( + &transport_global->qbuf, + grpc_chttp2_rst_stream_create( + stream_global->id, grpc_chttp2_grpc_status_to_http2_error(status))); } grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); @@ -907,7 +908,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); + t->global.concurrent_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (t->parsing.initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); @@ -934,17 +936,19 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ -static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closure) { +static void schedule_closure_for_connectivity(void *a, + grpc_iomgr_closure *closure) { grpc_chttp2_schedule_closure(a, closure, 1); } -static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) { - GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); +static void connectivity_state_set( + grpc_chttp2_transport_global *transport_global, + grpc_connectivity_state state) { + GRPC_CHTTP2_IF_TRACING( + gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set_with_scheduler( - &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, - schedule_closure_for_connectivity, - transport_global); + &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + state, schedule_closure_for_connectivity, transport_global); } void grpc_chttp2_schedule_closure( diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 9a956a5a58..dabe46badd 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -81,8 +81,7 @@ int grpc_connectivity_state_notify_on_state_change( void grpc_connectivity_state_set_with_scheduler( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), - void *arg) { + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; /*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/ @@ -111,5 +110,6 @@ static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) { void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { - grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, NULL); + grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, + NULL); } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index c6f903a1ea..bbdcbcb069 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -61,8 +61,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state); void grpc_connectivity_state_set_with_scheduler( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), - void *arg); + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index c29217599e..1397e21933 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -49,8 +49,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, initial_op); } -void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op) { +void grpc_transport_perform_stream_op(grpc_transport *transport, + grpc_stream *stream, + grpc_transport_stream_op *op) { transport->vtable->perform_stream_op(transport, stream, op); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 24a02132e9..599edc871f 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -99,7 +99,8 @@ typedef struct grpc_transport_op { gpr_slice *goaway_message; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); + void (*set_accept_stream)(void *user_data, grpc_transport *transport, + const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; @@ -154,10 +155,12 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. op - a grpc_transport_stream_op specifying the op to perform */ -void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op); +void grpc_transport_perform_stream_op(grpc_transport *transport, + grpc_stream *stream, + grpc_transport_stream_op *op); -void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op); +void grpc_transport_perform_op(grpc_transport *transport, + grpc_transport_op *op); /* Send a ping on a transport diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index b18f957009..515721dfb6 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -48,7 +48,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op); /* implementation of grpc_transport_perform_op */ void (*perform_op)(grpc_transport *self, grpc_transport_op *op); diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 4b889995a1..0b0a682607 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -63,14 +63,14 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { gpr_event_set(&a->done_write, (void *)1); } -static void server_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void server_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { thd_args *a = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(a->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(a->server)); + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(a->server)); } void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, @@ -105,8 +105,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, a.validator = validator; grpc_server_register_completion_queue(a.server, a.cq); grpc_server_start(a.server); - transport = grpc_create_chttp2_transport(NULL, sfd.server, - NULL, 0, mdctx, 0); + transport = grpc_create_chttp2_transport(NULL, sfd.server, NULL, 0, mdctx, 0); server_setup_transport(&a, transport, mdctx); /* Bind everything into the same pollset */ diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 55a15a1a54..eca2a40c97 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -39,7 +39,7 @@ #include #include "test/core/util/test_config.h" -static void channel_init_func(grpc_channel_element *elem,grpc_channel *master, +static void channel_init_func(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -75,8 +75,9 @@ static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) { static void test_create_channel_stack(void) { const grpc_channel_filter filter = { - call_func, channel_func, sizeof(int), call_init_func, call_destroy_func, - sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"}; + call_func, channel_func, sizeof(int), + call_init_func, call_destroy_func, sizeof(int), + channel_init_func, channel_destroy_func, "some_test_filter"}; const grpc_channel_filter *filters = &filter; grpc_channel_stack *channel_stack; grpc_call_stack *call_stack; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 9f3b36d5e9..f42b9831c8 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -55,14 +55,14 @@ /* chttp2 transport that is immediately available (used for testing connected_channel without a client_channel */ -static void server_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void server_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(f->server)); + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(f->server)); } typedef struct { @@ -70,8 +70,8 @@ typedef struct { grpc_channel_args *client_args; } sp_client_setup; -static void client_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void client_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, @@ -82,8 +82,8 @@ static void client_setup_transport( cs->f->client = channel; - grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -108,8 +108,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, - sfd->client, NULL, 0, mdctx, 1); + transport = + grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); } @@ -123,8 +123,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, - sfd->server, NULL, 0, mdctx, 0); + transport = + grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); server_setup_transport(f, transport, mdctx); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index d53a49edbf..be520380a7 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -55,14 +55,14 @@ /* chttp2 transport that is immediately available (used for testing connected_channel without a client_channel */ -static void server_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void server_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(f->server)); + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(f->server)); } typedef struct { @@ -70,8 +70,8 @@ typedef struct { grpc_channel_args *client_args; } sp_client_setup; -static void client_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void client_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, @@ -82,8 +82,8 @@ static void client_setup_transport( cs->f->client = channel; - grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -108,8 +108,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, - sfd->client, NULL, 0, mdctx, 1); + transport = + grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); } @@ -123,8 +123,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, - sfd->server, NULL, 0, mdctx, 0); + transport = + grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); server_setup_transport(f, transport, mdctx); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index c63d8f3fe9..037281c5ad 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -56,14 +56,14 @@ /* chttp2 transport that is immediately available (used for testing connected_channel without a client_channel */ -static void server_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void server_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(f->server)); + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(f->server)); } typedef struct { @@ -71,8 +71,8 @@ typedef struct { grpc_channel_args *client_args; } sp_client_setup; -static void client_setup_transport( - void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { +static void client_setup_transport(void *ts, grpc_transport *transport, + grpc_mdctx *mdctx) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, @@ -83,8 +83,8 @@ static void client_setup_transport( cs->f->client = channel; - grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -109,8 +109,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, - sfd->client, NULL, 0, mdctx, 1); + transport = + grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); } @@ -124,8 +124,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, - sfd->server, NULL, 0, mdctx, 0); + transport = + grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); server_setup_transport(f, transport, mdctx); } diff --git a/test/core/end2end/multiple_server_queues_test.c b/test/core/end2end/multiple_server_queues_test.c index 2d79f5adbd..32f37b0973 100644 --- a/test/core/end2end/multiple_server_queues_test.c +++ b/test/core/end2end/multiple_server_queues_test.c @@ -49,7 +49,7 @@ int main(int argc, char **argv) { grpc_server_register_completion_queue(server, cq2); grpc_server_start(server); grpc_server_shutdown_and_notify(server, cq2, NULL); - grpc_completion_queue_next(cq2, gpr_inf_future); /* cue queue hang */ + grpc_completion_queue_next(cq2, gpr_inf_future); /* cue queue hang */ grpc_completion_queue_shutdown(cq1); grpc_completion_queue_shutdown(cq2); grpc_completion_queue_next(cq1, gpr_inf_future); diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index aa4551f2f1..8327c681b8 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -40,9 +40,9 @@ #include "src/core/iomgr/iomgr.h" int main(int argc, char **argv) { - int i; - struct rlimit rlim; - grpc_endpoint_pair p; + int i; + struct rlimit rlim; + grpc_endpoint_pair p; grpc_test_init(argc, argv); grpc_iomgr_init(); @@ -53,9 +53,9 @@ int main(int argc, char **argv) { GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); for (i = 0; i < 100; i++) { - p = grpc_iomgr_create_endpoint_pair("test", 1); - grpc_endpoint_destroy(p.client); - grpc_endpoint_destroy(p.server); + p = grpc_iomgr_create_endpoint_pair("test", 1); + grpc_endpoint_destroy(p.client); + grpc_endpoint_destroy(p.server); } grpc_iomgr_shutdown(); -- cgit v1.2.3 From 0d4836d2c5de2cef30015d3f62a9cca511daacda Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Jun 2015 15:15:43 -0700 Subject: Remove the ability to specify a string on cancel: it was broken and unused; will restore later --- src/core/transport/metadata.c | 2 +- src/core/transport/transport.c | 4 ++-- src/core/transport/transport.h | 1 - src/core/transport/transport_op_string.c | 5 ----- 4 files changed, 3 insertions(+), 9 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index e75b449e12..c80d67823f 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) { if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ - /* gc_mdtab(ctx); */ + gc_mdtab(ctx); if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) { discard_metadata(ctx); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2f85a8557d..39d7b701f2 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -112,8 +112,8 @@ void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_mdstr *message) { if (op->cancel_with_status == GRPC_STATUS_OK) { op->cancel_with_status = status; - op->cancel_message = message; - } else if (message) { + } + if (message) { grpc_mdstr_unref(message); } } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 0a5b31a60a..a2c41c47af 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -77,7 +77,6 @@ typedef struct grpc_transport_op { grpc_pollset *bind_pollset; grpc_status_code cancel_with_status; - grpc_mdstr *cancel_message; /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 5c4edb006a..a408b75790 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -144,11 +144,6 @@ char *grpc_transport_op_string(grpc_transport_op *op) { first = 0; gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); gpr_strvec_add(&b, tmp); - if (op->cancel_message) { - gpr_asprintf(&tmp, ";msg='%s'", - grpc_mdstr_as_c_string(op->cancel_message)); - gpr_strvec_add(&b, tmp); - } } out = gpr_strvec_flatten(&b, NULL); -- cgit v1.2.3 From 9188d7a2df8217aa9414113a215668118a0dd700 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 5 Jul 2015 12:44:37 -0700 Subject: Change transport contract to automatically disconnect after sending a goaway iff there are no calls left - lets us remove this tracking from the server where it required a server-wide lock, and instead do the processing under the transport lock which parallelizes much more cleanly. --- src/core/surface/server.c | 86 +------------------------------- src/core/transport/chttp2/internal.h | 8 ++- src/core/transport/chttp2/stream_lists.c | 9 +++- src/core/transport/chttp2_transport.c | 11 +++- src/core/transport/transport.h | 4 +- 5 files changed, 27 insertions(+), 91 deletions(-) (limited to 'src/core/transport/transport.h') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ee394bb33a..341ca2942c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -114,7 +114,6 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; - size_t num_calls; grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; @@ -183,10 +182,6 @@ typedef enum { struct call_data { grpc_call *call; - /** is this call counted towards the channels total - number of calls? */ - gpr_uint8 active; - call_state state; grpc_mdstr *path; grpc_mdstr *host; @@ -208,9 +203,7 @@ struct call_data { typedef struct { grpc_channel **channels; - grpc_channel **disconnects; size_t num_channels; - size_t num_disconnects; } channel_broadcaster; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -229,26 +222,15 @@ static void maybe_finish_shutdown(grpc_server *server); static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; - size_t dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { count++; - if (c->num_calls == 0) { - dc_count++; - } } cb->num_channels = count; - cb->num_disconnects = dc_count; cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); - cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; - dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); - if (c->num_calls == 0) { - cb->disconnects[dc_count++] = c->channel; - GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); - } } } @@ -287,28 +269,11 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int force_disconnect) { size_t i; - if (send_goaway) { - for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 1, 0); - } - } - if (force_disconnect) { - for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 0, 1); - } - } else { - for (i = 0; i < cb->num_disconnects; i++) { - send_shutdown(cb->disconnects[i], 0, 1); - } - } for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], send_goaway, force_disconnect); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } - for (i = 0; i < cb->num_disconnects; i++) { - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); - } gpr_free(cb->channels); - gpr_free(cb->disconnects); } /* call list */ @@ -548,22 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static int decrement_call_count(channel_data *chand) { - int disconnect = 0; - chand->num_calls--; - if (0 == chand->num_calls && chand->server->shutdown) { - disconnect = 1; - } - maybe_finish_shutdown(chand->server); - return disconnect; -} - static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - int remove_res; - int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -608,21 +561,7 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - remove_res = calld->active; - calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); - gpr_mu_lock(&chand->server->mu_global); - if (remove_res) { - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - } - gpr_mu_unlock(&chand->server->mu_global); - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); - } break; } @@ -684,14 +623,9 @@ static void init_call_elem(grpc_call_element *elem, memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); - calld->active = 1; grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); - gpr_mu_lock(&chand->server->mu_global); - chand->num_calls++; - gpr_mu_unlock(&chand->server->mu_global); - server_ref(chand->server); if (initial_op) server_mutate_op(elem, initial_op); @@ -700,30 +634,13 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int disconnect = 0; - int active; size_t i; gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { call_list_remove(elem->call_data, i); } - active = calld->active; - calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); - if (active) { - gpr_mu_lock(&chand->server->mu_global); - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - gpr_mu_unlock(&chand->server->mu_global); - } - - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); - } if (calld->host) { grpc_mdstr_unref(calld->host); @@ -743,7 +660,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(!is_last); chand->server = NULL; - chand->num_calls = 0; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f98a5bd71..bdd4b432eb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -173,6 +173,8 @@ typedef struct { /** have we seen a goaway */ gpr_uint8 seen_goaway; + /** have we sent a goaway */ + gpr_uint8 sent_goaway; /** is this transport a client? */ gpr_uint8 is_client; @@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +/* returns 1 if this is the last stream, 0 otherwise */ +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; +int grpc_chttp2_has_streams(grpc_chttp2_transport *t); void grpc_chttp2_for_all_streams( grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 85691b32d2..4fea058c19 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); +} + +int grpc_chttp2_has_streams(grpc_chttp2_transport *t) { + return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } void grpc_chttp2_for_all_streams( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0a7b8f5bf9..d955de8865 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -385,7 +385,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); - grpc_chttp2_unregister_stream(t, s); + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); @@ -684,10 +686,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->send_goaway) { + t->global.sent_goaway = 1; grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), *op->goaway_message, &t->global.qbuf); + if (!grpc_chttp2_has_streams(t)) { + close_transport_locked(t); + } } if (op->set_accept_stream != NULL) { @@ -736,6 +742,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(&t->parsing); } + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 579bcc943f..1429737721 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -91,7 +91,9 @@ typedef struct grpc_transport_op { grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ int disconnect; - /** should we send a goaway? */ + /** should we send a goaway? + after a goaway is sent, once there are no more active calls on + the transport, the transport should disconnect */ int send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status; -- cgit v1.2.3