diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/channel_stack.c | 17 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 41 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 152 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 12 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.c | 82 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.h | 16 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 3 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 6 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 3 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 80 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.h | 14 |
11 files changed, 187 insertions, 239 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 7f7fbf420f..5e09a050ee 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -101,11 +101,12 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *channel_args, - grpc_channel_stack *stack) { + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); @@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, size_t i; stack->count = filter_count; + GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, + name); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < filter_count; i++) { - args.master = master; + args.channel_stack = stack; args.channel_args = channel_args; args.is_first = i == 0; args.is_last = i == (filter_count - 1); @@ -166,15 +169,15 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, size_t i; call_stack->count = count; - gpr_ref_init(&call_stack->refcount.refs, initial_refs); - grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg); + GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + destroy_arg, "CALL_STACK"); call_elems = CALL_ELEMS_FROM_STACK(call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ for (i = 0; i < count; i++) { - args.refcount = &call_stack->refcount; + args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; call_elems[i].filter = channel_elems[i].filter; diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1db12ead7e..c01050e717 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,15 +51,18 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; +typedef struct grpc_channel_stack grpc_channel_stack; +typedef struct grpc_call_stack grpc_call_stack; + typedef struct { - grpc_channel *master; + grpc_channel_stack *channel_stack; const grpc_channel_args *channel_args; int is_first; int is_last; } grpc_channel_element_args; typedef struct { - grpc_stream_refcount *refcount; + grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; } grpc_call_element_args; @@ -144,23 +147,24 @@ struct grpc_call_element { /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_channel_stack { + grpc_stream_refcount refcount; size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ size_t call_stack_size; -} grpc_channel_stack; +}; /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_call_stack { /* shared refcount for this channel stack. MUST be the first element: the underlying code calls destroy with the address of the refcount, but higher layers prefer to think about the address of the call stack itself. */ grpc_stream_refcount refcount; size_t count; -} grpc_call_stack; +}; /* Get a channel element given a channel stack and its index */ grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, @@ -175,11 +179,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, - const grpc_channel_args *args, - grpc_channel_stack *stack); + size_t filter_count, const grpc_channel_args *args, + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_stack *stack); @@ -199,14 +203,23 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); #ifdef GRPC_STREAM_REFCOUNT_DEBUG -#define grpc_call_stack_ref(call_stack, reason) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount, reason) -#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \ +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else -#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount) -#define grpc_call_stack_unref(exec_ctx, call_stack) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) \ + grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif /* Destroy a call stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 020138bf15..9f993b39d6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -59,11 +59,6 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ int started_resolving; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -81,8 +76,10 @@ typedef struct client_channel_channel_data { grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ int exit_idle_when_lb_policy_arrives; - /** pollset_set of interested parties in a new connection */ - grpc_pollset_set pollset_set; + /** owning stack */ + grpc_channel_stack *owning_stack; + /** interested parties */ + grpc_pollset_set interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -103,9 +100,7 @@ typedef struct { } waiting_call; static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -121,10 +116,18 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void on_lb_policy_state_changed_locked( grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { + grpc_connectivity_state publish_state = w->state; /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state, + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && + w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state, "lb_changed"); if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); @@ -139,7 +142,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, on_lb_policy_state_changed_locked(exec_ctx, w); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -147,7 +150,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); @@ -179,6 +182,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; + if (lb_policy != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, + &chand->interested_parties); + } + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; @@ -200,7 +208,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } gpr_mu_unlock(&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); @@ -222,7 +230,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); + grpc_pollset_set_del_pollset_set(exec_ctx, + &old_lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -230,13 +240,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { - grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; @@ -254,18 +263,15 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->connectivity_state = NULL; } - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } - if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -276,16 +282,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, destroy_resolver); GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); } - - if (lb_policy) { - grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); - } } typedef struct { grpc_metadata_batch *initial_metadata; - grpc_subchannel **subchannel; + grpc_connected_subchannel **connected_subchannel; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -293,17 +294,17 @@ typedef struct { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { continue_picking_args *cpa = arg; if (!success) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); - } else if (cpa->subchannel == NULL) { + } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready)) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); } gpr_free(cpa); @@ -311,7 +312,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { grpc_call_element *elem = elemp; channel_data *chand = elem->channel_data; @@ -319,18 +320,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, continue_picking_args *cpa; grpc_closure *closure; - GPR_ASSERT(subchannel); + GPR_ASSERT(connected_subchannel); gpr_mu_lock(&chand->mu_config); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel); + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, + connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = grpc_closure_next(closure)) { cpa = closure->cb_arg; - if (cpa->subchannel == subchannel) { - cpa->subchannel = NULL; + if (cpa->connected_subchannel == connected_subchannel) { + cpa->connected_subchannel = NULL; grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); } } @@ -338,21 +340,22 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, return 1; } if (chand->lb_policy != NULL) { - int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, subchannel, on_ready); + int r = + grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, + initial_metadata, connected_subchannel, on_ready); gpr_mu_unlock(&chand->mu_config); return r; } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); } cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; - cpa->subchannel = subchannel; + cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking, cpa); @@ -364,7 +367,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem); + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, + args->call_stack); } /* Destructor for call_data */ @@ -385,12 +389,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->master = args->master; - grpc_pollset_set_init(&chand->pollset_set); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_pollset_set_init(&chand->interested_parties); } /* Destructor for channel_data */ @@ -403,10 +407,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->pollset_set); + grpc_pollset_set_destroy(&chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -435,7 +442,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } @@ -454,7 +461,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = 1; grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, @@ -466,32 +473,39 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } +typedef struct { + channel_data *chand; + grpc_pollset *pollset; + grpc_closure *on_complete; + grpc_closure my_closure; +} external_connectivity_watcher; + +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + external_connectivity_watcher *w = arg; + grpc_closure *follow_up = w->on_complete; + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); +} + void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; + external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + w->chand = chand; + w->pollset = pollset; + w->on_complete = on_complete; + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, + "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, on_complete); + exec_ctx, &chand->state_tracker, state, &w->my_closure); gpr_mu_unlock(&chand->mu_config); } - -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - return &chand->pollset_set; -} - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); -} - -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); -} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5103f07a43..d9bc4971f1 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 456ffb7371..2c0b07d8bf 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -58,13 +58,13 @@ typedef struct client_uchannel_channel_data { this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; + grpc_channel_stack *owning_stack; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** the subchannel wrapped by the microchannel */ - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; /** the callback used to stay subscribed to subchannel connectivity * notifications */ @@ -84,15 +84,13 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, + &chand->subchannel_connectivity, &chand->connectivity_cb); } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -128,11 +126,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { channel_data *chand = arg; GPR_ASSERT(initial_metadata != NULL); - *subchannel = chand->subchannel; + *connected_subchannel = chand->connected_subchannel; return 1; } @@ -140,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data); + elem->channel_data, args->call_stack); } /* Destructor for call_data */ @@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->master = args->master; + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -168,10 +166,14 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel, - &chand->connectivity_cb); + /* cancel subscription */ + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, NULL, + &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel, + "uchannel"); } static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -191,23 +193,14 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_lock(&chand->mu_state); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_CONNECTING, - "uchannel_connecting_changed"); - chand->subchannel_connectivity = out; - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); - } + out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_unlock(&chand->mu_state); return out; } void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_state); @@ -216,40 +209,11 @@ void grpc_client_uchannel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_state); } -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - grpc_channel_element *parent_elem; - gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( - grpc_subchannel_get_master(chand->subchannel))); - gpr_mu_unlock(&chand->mu_state); - return grpc_client_channel_get_connecting_pollset_set(parent_elem); -} - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); -} - -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); -} - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - grpc_channel *master = grpc_subchannel_get_master(subchannel); - char *target = grpc_channel_get_target(master); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 0; @@ -261,19 +225,19 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, GPR_ASSERT(n <= MAX_FILTERS); channel = - grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1); + grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1); - gpr_free(target); return channel; } -void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, - grpc_subchannel *subchannel) { +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { grpc_channel_element *elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); gpr_mu_lock(&chand->mu_state); - chand->subchannel = subchannel; + chand->connected_subchannel = connected_subchannel; + GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel"); gpr_mu_unlock(&chand->mu_state); } diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index dfe6695ae3..92a831493c 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -48,23 +48,13 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); -void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, - grpc_subchannel *subchannel); +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index d7d1c189fe..cc8e191628 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_compress_filter = { compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 0e1efd965a..e8eb9dcfc5 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -89,9 +89,9 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - args->refcount, args->server_transport_data); + r = grpc_transport_init_stream( + exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + &args->call_stack->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index bc3a56cbf0..ae8660da92 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -224,8 +224,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 7251714519..f5da41f3cd 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -44,7 +44,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, int success); -static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success); static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, int success); @@ -58,16 +57,17 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg) { + void *pick_subchannel_arg, grpc_call_stack *owning_call) { gpr_atm_rel_store(&holder->subchannel_call, 0); holder->pick_subchannel = pick_subchannel; holder->pick_subchannel_arg = pick_subchannel_arg; gpr_mu_init(&holder->mu); - holder->subchannel = NULL; + holder->connected_subchannel = NULL; holder->waiting_ops = NULL; holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->owning_call = owning_call; } void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, @@ -125,13 +125,9 @@ retry: case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, holder); break; - case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL: - grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel, - &holder->subchannel_call); - break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, - &holder->subchannel, NULL); + &holder->connected_subchannel, NULL); break; } gpr_mu_unlock(&holder->mu); @@ -142,28 +138,27 @@ retry: } /* if we don't have a subchannel, try to get one */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->subchannel == NULL && op->send_initial_metadata != NULL) { + holder->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); - if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, - op->send_initial_metadata, &holder->subchannel, - &holder->next_step)) { + GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel"); + if (holder->pick_subchannel( + exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, + &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } } /* if we've got a subchannel, then let's ask it to create a call */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->subchannel != NULL) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL; - grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { - /* got one immediately - continue the op (and any waiting ops) */ - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - retry_waiting_locked(exec_ctx, holder); - goto retry; - } + holder->connected_subchannel != NULL) { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + goto retry; } /* nothing to be done but wait */ add_waiting_locked(holder, op); @@ -179,36 +174,18 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); call = GET_CALL(holder); GPR_ASSERT(call == NULL || call == CANCELLED_CALL); - if (holder->subchannel == NULL) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + if (holder->connected_subchannel == NULL) { fail_locked(exec_ctx, holder); } else { - grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - /* got one immediately - continue the op (and any waiting ops) */ - retry_waiting_locked(exec_ctx, holder); - } - } - gpr_mu_unlock(&holder->mu); -} - -static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { - grpc_subchannel_call_holder *holder = arg; - GPR_TIMER_BEGIN("call_ready", 0); - gpr_mu_lock(&holder->mu); - GPR_ASSERT(holder->creation_phase == - GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL); - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - if (GET_CALL(holder) != NULL) { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); - } else { - fail_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); - GPR_TIMER_END("call_ready", 0); + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } typedef struct { @@ -270,14 +247,13 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; } -char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master) { +char *grpc_subchannel_call_holder_get_peer( + grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); if (subchannel_call) { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } else { - return grpc_channel_get_target(master); + return NULL; } } diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index bda051c566..9cf72c6cf7 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -42,12 +42,11 @@ called when the subchannel is available) */ typedef int (*grpc_subchannel_call_holder_pick_subchannel)( grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, grpc_closure *on_ready); + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); typedef enum { GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL, - GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } grpc_subchannel_call_holder_creation_phase; /** Wrapper for holding a pointer to grpc_subchannel_call, and the @@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder { gpr_mu mu; grpc_subchannel_call_holder_creation_phase creation_phase; - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; grpc_pollset *pollset; grpc_transport_stream_op *waiting_ops; @@ -79,12 +78,14 @@ typedef struct grpc_subchannel_call_holder { size_t waiting_ops_capacity; grpc_closure next_step; + + grpc_call_stack *owning_call; } grpc_subchannel_call_holder; void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg); + void *pick_subchannel_arg, grpc_call_stack *owning_call); void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder); @@ -92,7 +93,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master); + grpc_subchannel_call_holder *holder); #endif |