From b5585d4f723003e4c900f02c2f4ce25e6fb26d5e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 Nov 2015 07:18:31 -0800 Subject: Initial pass through to make subchannels single connect --- src/core/channel/subchannel_call_holder.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/core/channel/subchannel_call_holder.h') diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index bda051c566..6328f35344 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -42,12 +42,11 @@ called when the subchannel is available) */ typedef int (*grpc_subchannel_call_holder_pick_subchannel)( grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, grpc_closure *on_ready); + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); typedef enum { GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL, - GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } grpc_subchannel_call_holder_creation_phase; /** Wrapper for holding a pointer to grpc_subchannel_call, and the @@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder { gpr_mu mu; grpc_subchannel_call_holder_creation_phase creation_phase; - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; grpc_pollset *pollset; grpc_transport_stream_op *waiting_ops; -- cgit v1.2.3 From 906e3bcfb5a32a25af2f00dd4679052f8a608b3f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 07:31:31 -0800 Subject: Stripping out master channel as a concept --- src/core/channel/channel_stack.c | 11 ++++-- src/core/channel/channel_stack.h | 36 +++++++++++------ src/core/channel/client_channel.c | 52 ++++++------------------- src/core/channel/client_channel.h | 12 +----- src/core/channel/client_uchannel.c | 35 ++--------------- src/core/channel/client_uchannel.h | 10 ----- src/core/channel/connected_channel.c | 2 +- src/core/channel/subchannel_call_holder.c | 5 +-- src/core/channel/subchannel_call_holder.h | 3 +- src/core/client_config/subchannel.c | 64 ++++++++----------------------- src/core/client_config/subchannel.h | 5 --- src/core/surface/channel.c | 1 - src/core/surface/secure_channel_create.c | 1 - test/core/end2end/fixtures/h2_uchannel.c | 1 - 14 files changed, 67 insertions(+), 171 deletions(-) (limited to 'src/core/channel/subchannel_call_holder.h') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 7f7fbf420f..53433a0923 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -101,9 +101,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *channel_args, grpc_channel_stack *stack) { size_t call_size = @@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, size_t i; stack->count = filter_count; + gpr_ref_init(&stack->refcount.refs, initial_refs); + grpc_closure_init(&stack->refcount.destroy, destroy, destroy_arg); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < filter_count; i++) { - args.master = master; + args.channel_stack = stack; args.channel_args = channel_args; args.is_first = i == 0; args.is_last = i == (filter_count - 1); @@ -174,7 +177,7 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < count; i++) { - args.refcount = &call_stack->refcount; + args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; call_elems[i].filter = channel_elems[i].filter; diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1db12ead7e..593adcd7b5 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,15 +51,18 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; +typedef struct grpc_channel_stack grpc_channel_stack; +typedef struct grpc_call_stack grpc_call_stack; + typedef struct { - grpc_channel *master; + grpc_channel_stack *channel_stack; const grpc_channel_args *channel_args; int is_first; int is_last; } grpc_channel_element_args; typedef struct { - grpc_stream_refcount *refcount; + grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; } grpc_call_element_args; @@ -144,23 +147,24 @@ struct grpc_call_element { /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_channel_stack { + grpc_stream_refcount refcount; size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ size_t call_stack_size; -} grpc_channel_stack; +}; /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_call_stack { /* shared refcount for this channel stack. MUST be the first element: the underlying code calls destroy with the address of the refcount, but higher layers prefer to think about the address of the call stack itself. */ grpc_stream_refcount refcount; size_t count; -} grpc_call_stack; +}; /* Get a channel element given a channel stack and its index */ grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, @@ -175,9 +179,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *args, grpc_channel_stack *stack); /* Destroy a channel stack */ @@ -199,14 +204,21 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); #ifdef GRPC_STREAM_REFCOUNT_DEBUG -#define grpc_call_stack_ref(call_stack, reason) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount, reason) -#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \ +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else -#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount) -#define grpc_call_stack_unref(exec_ctx, call_stack) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif /* Destroy a call stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f026d32265..fe43c50018 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -59,11 +59,6 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ int started_resolving; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -81,8 +76,8 @@ typedef struct client_channel_channel_data { grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ int exit_idle_when_lb_policy_arrives; - /** pollset_set of interested parties in a new connection */ - grpc_pollset_set pollset_set; + /** owning stack */ + grpc_channel_stack *owning_stack; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -103,9 +98,7 @@ typedef struct { } waiting_call; static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -139,7 +132,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, on_lb_policy_state_changed_locked(exec_ctx, w); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -147,7 +140,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); @@ -200,7 +193,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } gpr_mu_unlock(&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); @@ -230,7 +223,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -347,7 +340,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); @@ -387,8 +380,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->master = args->master; - grpc_pollset_set_init(&chand->pollset_set); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, @@ -408,7 +399,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -437,7 +427,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } @@ -456,7 +446,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = 1; grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, @@ -469,7 +459,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_config); @@ -477,23 +467,3 @@ void grpc_client_channel_watch_connectivity_state( exec_ctx, &chand->state_tracker, state, on_complete); gpr_mu_unlock(&chand->mu_config); } - -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - return &chand->pollset_set; -} - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); -} - -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); -} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5103f07a43..d9bc4971f1 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 926bbde838..b30d030197 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -58,7 +58,7 @@ typedef struct client_uchannel_channel_data { this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; + grpc_channel_stack *owning_stack; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; @@ -90,9 +90,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->master = args->master; + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -216,33 +214,6 @@ void grpc_client_uchannel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_state); } -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - grpc_channel_element *parent_elem; - gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(chand->master)); - gpr_mu_unlock(&chand->mu_state); - return grpc_client_channel_get_connecting_pollset_set(parent_elem); -} - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); -} - -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); -} - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args) { grpc_channel *channel = NULL; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index 120a3daf3d..a5cf271042 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -51,16 +51,6 @@ void grpc_client_uchannel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 0e1efd965a..73a7dcc81f 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -91,7 +91,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); r = grpc_transport_init_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - args->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 2e3d49e806..63069baa84 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -244,13 +244,12 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, } char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master) { + grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); if (subchannel_call) { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } else { - return grpc_channel_get_target(master); + return NULL; } } diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 6328f35344..692c5e5316 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -91,7 +91,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master); + grpc_subchannel_call_holder *holder); #endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 07a74e250f..6cc2364eaa 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -56,11 +56,6 @@ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) -struct grpc_connected_subchannel { - /** refcount */ - gpr_refcount refs; -}; - typedef struct { grpc_closure closure; union { @@ -84,11 +79,6 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** set during connection */ grpc_connect_out_args connecting_result; @@ -97,10 +87,8 @@ struct grpc_subchannel { grpc_closure connected; /** pollset_set tracking who's interested in a connection - being setup - owned by the master channel (in particular the - client_channel - filter there-in) */ - grpc_pollset_set *pollset_set; + being setup */ + grpc_pollset_set pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -132,7 +120,7 @@ struct grpc_subchannel_call { }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) @@ -151,6 +139,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason #define REF_PASS_REASON , reason +#define REF_REASON reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) @@ -164,6 +153,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS #define REF_PASS_REASON +#define REF_REASON "" #define REF_LOG(name, p) \ do { \ } while (0) @@ -185,18 +175,13 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, void grpc_connected_subchannel_ref( grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("CONNECTION", c); - gpr_ref(&c->refs); + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("CONNECTION", c); - if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), - 1); - } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } /* @@ -215,6 +200,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c->addr); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); + grpc_pollset_set_destroy(&c->pollset_set); gpr_free(c); } @@ -235,13 +221,13 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); } void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); } static gpr_uint32 random_seed() { @@ -251,8 +237,6 @@ static gpr_uint32 random_seed() { grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); gpr_ref_init(&c->refs, 1); c->connector = connector; @@ -263,10 +247,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, sizeof(grpc_channel_filter *) * c->num_filters); c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); + grpc_pollset_set_init(&c->pollset_set); c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); - c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, @@ -278,7 +261,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; + args.interested_parties = &c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -318,7 +301,6 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, c->connecting = 1; /* released by connection */ GRPC_SUBCHANNEL_REF(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); } gpr_mu_unlock(&c->mu); @@ -448,10 +430,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* construct channel stack */ channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&con->refs, 1); - grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, + con = gpr_malloc(channel_stack_size); + stk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, num_filters, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); @@ -471,7 +452,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); return; } @@ -495,7 +475,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_unlock(&c->mu); gpr_free((void *)filters); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); } /* Generate a random number between 0 and 1. */ @@ -554,7 +533,6 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } } @@ -605,21 +583,13 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 14eb4baa1f..b7db363866 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -161,15 +161,10 @@ struct grpc_subchannel_args { /** Address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel */ - grpc_channel *master; }; /** create a subchannel given a connector */ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args); -/** Return the master channel associated with the subchannel */ -grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel); - #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 859197412b..1632b79d0e 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -63,7 +63,6 @@ typedef struct registered_call { struct grpc_channel { int is_client; - gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdelem *default_authority; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 471b5a71e7..344b4d79f7 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -207,7 +207,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 3add8e8007..56d7d295fb 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); -- cgit v1.2.3 From 11beb9a55c34fdac8398ce01381ca030c8d6223f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 10:29:32 -0800 Subject: Fixes --- src/core/channel/client_channel.c | 4 +++- src/core/channel/client_uchannel.c | 2 +- src/core/channel/subchannel_call_holder.c | 8 ++++++-- src/core/channel/subchannel_call_holder.h | 4 +++- test/core/end2end/fixtures/h2_uchannel.c | 3 ++- 5 files changed, 15 insertions(+), 6 deletions(-) (limited to 'src/core/channel/subchannel_call_holder.h') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index fe43c50018..955b390dbf 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -359,7 +359,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem); + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, + args->call_stack); } /* Destructor for call_data */ @@ -381,6 +382,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&chand->mu_config); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index ff50ebfb60..f8c621b8eb 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -138,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data); + elem->channel_data, args->call_stack); } /* Destructor for call_data */ diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 9875dd8080..1455cc0a24 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -57,7 +57,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg) { + void *pick_subchannel_arg, grpc_call_stack *owning_call) { gpr_atm_rel_store(&holder->subchannel_call, 0); holder->pick_subchannel = pick_subchannel; holder->pick_subchannel_arg = pick_subchannel_arg; @@ -67,6 +67,7 @@ void grpc_subchannel_call_holder_init( holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->owning_call = owning_call; } void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, @@ -141,10 +142,12 @@ retry: op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); + GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel"); if (holder->pick_subchannel( exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } } /* if we've got a subchannel, then let's ask it to create a call */ @@ -171,8 +174,8 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); call = GET_CALL(holder); GPR_ASSERT(call == NULL || call == CANCELLED_CALL); + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; fail_locked(exec_ctx, holder); } else { gpr_atm_rel_store( @@ -182,6 +185,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } typedef struct { diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 692c5e5316..9cf72c6cf7 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -78,12 +78,14 @@ typedef struct grpc_subchannel_call_holder { size_t waiting_ops_capacity; grpc_closure next_step; + + grpc_call_stack *owning_call; } grpc_subchannel_call_holder; void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg); + void *pick_subchannel_arg, grpc_call_stack *owning_call); void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 56d7d295fb..6c1638590f 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -252,7 +252,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), + grpc_pollset_work(&exec_ctx, &pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_flush(&exec_ctx); -- cgit v1.2.3