diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-11-17 07:18:31 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-11-17 07:18:31 -0800 |
commit | b5585d4f723003e4c900f02c2f4ce25e6fb26d5e (patch) | |
tree | ec65b2d8caa4b4f35c2d9b8e644542478e163125 /src/core | |
parent | 860f484f43841d14df61cfeda6ca5e637ed99d19 (diff) |
Initial pass through to make subchannels single connect
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 22 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.c | 20 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.h | 4 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 54 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.h | 7 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 37 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 18 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 4 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 8 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 481 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 51 | ||||
-rw-r--r-- | src/core/transport/transport.h | 2 |
12 files changed, 232 insertions, 476 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 16d91d4277..5517507423 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -287,7 +287,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, typedef struct { grpc_metadata_batch *initial_metadata; - grpc_subchannel **subchannel; + grpc_connected_subchannel **connected_subchannel; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -295,17 +295,17 @@ typedef struct { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { continue_picking_args *cpa = arg; if (!success) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); - } else if (cpa->subchannel == NULL) { + } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready)) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); } gpr_free(cpa); @@ -313,7 +313,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { grpc_call_element *elem = elemp; channel_data *chand = elem->channel_data; @@ -321,18 +321,18 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, continue_picking_args *cpa; grpc_closure *closure; - GPR_ASSERT(subchannel); + GPR_ASSERT(connected_subchannel); gpr_mu_lock(&chand->mu_config); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel); + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = grpc_closure_next(closure)) { cpa = closure->cb_arg; - if (cpa->subchannel == subchannel) { - cpa->subchannel = NULL; + if (cpa->connected_subchannel == connected_subchannel) { + cpa->connected_subchannel = NULL; grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); } } @@ -341,7 +341,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } if (chand->lb_policy != NULL) { int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, subchannel, on_ready); + initial_metadata, connected_subchannel, on_ready); gpr_mu_unlock(&chand->mu_config); return r; } @@ -354,7 +354,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; - cpa->subchannel = subchannel; + cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking, cpa); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index ec6b02381a..b27284e8a7 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -67,7 +67,7 @@ typedef struct client_uchannel_channel_data { grpc_connectivity_state_tracker state_tracker; /** the subchannel wrapped by the microchannel */ - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; /** the callback used to stay subscribed to subchannel connectivity * notifications */ @@ -87,7 +87,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, &chand->connectivity_cb); } @@ -131,11 +131,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { channel_data *chand = arg; GPR_ASSERT(initial_metadata != NULL); - *subchannel = chand->subchannel; + *connected_subchannel = chand->connected_subchannel; return 1; } @@ -172,7 +172,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel, + grpc_connected_subchannel_state_change_unsubscribe(exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); @@ -202,7 +202,7 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( GRPC_CHANNEL_CONNECTING, "uchannel_connecting_changed"); chand->subchannel_connectivity = out; - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, &chand->connectivity_cb); } @@ -226,7 +226,7 @@ grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( grpc_channel_element *parent_elem; gpr_mu_lock(&chand->mu_state); parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( - grpc_subchannel_get_master(chand->subchannel))); + chand->master)); gpr_mu_unlock(&chand->mu_state); return grpc_client_channel_get_connecting_pollset_set(parent_elem); } @@ -273,13 +273,13 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, return channel; } -void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, - grpc_subchannel *subchannel) { +void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, + grpc_connected_subchannel *connected_subchannel) { grpc_channel_element *elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); gpr_mu_lock(&chand->mu_state); - chand->subchannel = subchannel; + chand->connected_subchannel = connected_subchannel; gpr_mu_unlock(&chand->mu_state); } diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index dfe6695ae3..1acf9bfd69 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -64,7 +64,7 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); -void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, - grpc_subchannel *subchannel); +void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, + grpc_connected_subchannel *connected_subchannel); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */ diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 7251714519..d1a7f86348 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -44,7 +44,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, int success); -static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success); static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, int success); @@ -63,7 +62,7 @@ void grpc_subchannel_call_holder_init( holder->pick_subchannel = pick_subchannel; holder->pick_subchannel_arg = pick_subchannel_arg; gpr_mu_init(&holder->mu); - holder->subchannel = NULL; + holder->connected_subchannel = NULL; holder->waiting_ops = NULL; holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; @@ -125,13 +124,9 @@ retry: case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, holder); break; - case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL: - grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel, - &holder->subchannel_call); - break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, - &holder->subchannel, NULL); + &holder->connected_subchannel, NULL); break; } gpr_mu_unlock(&holder->mu); @@ -142,28 +137,21 @@ retry: } /* if we don't have a subchannel, try to get one */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->subchannel == NULL && op->send_initial_metadata != NULL) { + holder->connected_subchannel == NULL && op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, - op->send_initial_metadata, &holder->subchannel, + op->send_initial_metadata, &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; } } /* if we've got a subchannel, then let's ask it to create a call */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->subchannel != NULL) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL; - grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { - /* got one immediately - continue the op (and any waiting ops) */ - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - retry_waiting_locked(exec_ctx, holder); - goto retry; - } + holder->connected_subchannel != NULL) { + gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + goto retry; } /* nothing to be done but wait */ add_waiting_locked(holder, op); @@ -179,36 +167,14 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); call = GET_CALL(holder); GPR_ASSERT(call == NULL || call == CANCELLED_CALL); - if (holder->subchannel == NULL) { + if (holder->connected_subchannel == NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; fail_locked(exec_ctx, holder); } else { - grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - /* got one immediately - continue the op (and any waiting ops) */ - retry_waiting_locked(exec_ctx, holder); - } - } - gpr_mu_unlock(&holder->mu); -} - -static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { - grpc_subchannel_call_holder *holder = arg; - GPR_TIMER_BEGIN("call_ready", 0); - gpr_mu_lock(&holder->mu); - GPR_ASSERT(holder->creation_phase == - GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL); - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - if (GET_CALL(holder) != NULL) { + gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); - } else { - fail_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); - GPR_TIMER_END("call_ready", 0); } typedef struct { diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index bda051c566..6328f35344 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -42,12 +42,11 @@ called when the subchannel is available) */ typedef int (*grpc_subchannel_call_holder_pick_subchannel)( grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, grpc_closure *on_ready); + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); typedef enum { GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL, - GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } grpc_subchannel_call_holder_creation_phase; /** Wrapper for holding a pointer to grpc_subchannel_call, and the @@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder { gpr_mu mu; grpc_subchannel_call_holder_creation_phase creation_phase; - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; grpc_pollset *pollset; grpc_transport_stream_op *waiting_ops; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 93312abb00..6d9e6af4a6 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -42,7 +42,7 @@ typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; - grpc_subchannel **target; + grpc_connected_subchannel **target; grpc_closure *on_complete; } pending_pick; @@ -60,7 +60,7 @@ typedef struct { /** the selected channel TODO(ctiller): this should be atomically set so we don't need to take a mutex in the common case */ - grpc_subchannel *selected; + grpc_connected_subchannel *selected; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -102,7 +102,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } if (p->selected) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); @@ -131,7 +131,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_subchannel **target) { + grpc_connected_subchannel **target) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -174,7 +174,7 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -207,7 +207,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_op op; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; - grpc_subchannel *exclude_subchannel; + grpc_connected_subchannel *exclude_subchannel; gpr_mu_lock(&p->mu); subchannels = p->subchannels; @@ -218,7 +218,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (subchannels[i] != exclude_subchannel) { + if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) { memset(&op, 0, sizeof(op)); op.disconnect = 1; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); @@ -232,6 +232,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; + grpc_subchannel *selected_subchannel; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -244,7 +245,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { @@ -256,8 +257,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); - p->selected = p->subchannels[p->checking_subchannel]; - GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); + selected_subchannel = p->subchannels[p->checking_subchannel]; + p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); + GPR_ASSERT(p->selected); + GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, @@ -266,12 +269,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, p->selected, + grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); break; @@ -342,14 +345,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, size_t i; size_t n; grpc_subchannel **subchannels; - grpc_subchannel *selected; + grpc_connected_subchannel *selected; gpr_mu_lock(&p->mu); n = p->num_subchannels; subchannels = gpr_malloc(n * sizeof(*subchannels)); selected = p->selected; if (selected) { - GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); + GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); } for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; @@ -358,13 +361,13 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == subchannels[i]) continue; + if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { - grpc_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); } gpr_free(subchannels); } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 1ffe32fff2..08592b79e1 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -46,7 +46,7 @@ int grpc_lb_round_robin_trace = 0; typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; - grpc_subchannel **target; + grpc_connected_subchannel **target; grpc_closure *on_complete; } pending_pick; @@ -144,9 +144,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *csc) { + grpc_subchannel *sc) { ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = csc; + new_elem->subchannel = sc; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -160,7 +160,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, p->ready_list.prev = new_elem; } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); } return new_elem; } @@ -265,7 +265,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_subchannel **target) { + grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; size_t i; @@ -314,7 +314,7 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; @@ -323,9 +323,9 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { gpr_mu_unlock(&p->mu); - *target = selected->subchannel; + *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", + gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } /* only advance the last picked pointer if the selection was used */ @@ -390,7 +390,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = selected->subchannel; + *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 36a2454309..6fa3c1b423 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -71,13 +71,13 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { + grpc_connected_subchannel **target, grpc_closure *on_complete) { return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, on_complete); } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_subchannel **target) { + grpc_connected_subchannel **target) { policy->vtable->cancel_pick(exec_ctx, policy, target); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index a696c3ce64..b1fb64c06c 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -58,9 +58,9 @@ struct grpc_lb_policy_vtable { /** implement grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); + grpc_connected_subchannel **target, grpc_closure *on_complete); void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_subchannel **target); + grpc_connected_subchannel **target); /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -111,10 +111,10 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); + grpc_connected_subchannel **target, grpc_closure *on_complete); void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_subchannel **target); + grpc_connected_subchannel **target); void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_transport_op *op); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 49c2cf9a19..e2644be253 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -52,33 +52,29 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -typedef struct { - /* all fields protected by subchannel->mu */ +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel))) + +struct grpc_connected_subchannel { /** refcount */ - int refs; - /** parent subchannel */ - grpc_subchannel *subchannel; -} connection; + gpr_refcount refs; +}; typedef struct { grpc_closure closure; - size_t version; - grpc_subchannel *subchannel; + union { + grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; + } whom; grpc_connectivity_state connectivity_state; } state_watcher; -typedef struct waiting_for_connect { - struct waiting_for_connect *next; - grpc_closure *notify; - grpc_pollset *pollset; - gpr_atm *target; - grpc_subchannel *subchannel; - grpc_closure continuation; -} waiting_for_connect; - struct grpc_subchannel { grpc_connector *connector; + /** refcount */ + gpr_refcount refs; + /** non-transport related channel filters */ const grpc_channel_filter **filters; size_t num_filters; @@ -94,8 +90,6 @@ struct grpc_subchannel { We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ grpc_channel *master; - /** have we seen a disconnection? */ - int disconnected; /** set during connection */ grpc_connect_out_args connecting_result; @@ -109,19 +103,16 @@ struct grpc_subchannel { filter there-in) */ grpc_pollset_set *pollset_set; + /** active connection, or null; of type grpc_connected_subchannel */ + gpr_atm connected_subchannel; + /** mutex protecting remaining elements */ gpr_mu mu; - /** active connection */ - connection *active; - /** version number for the active connection */ - size_t active_version; - /** refcount */ - int refs; + /** have we seen a disconnection? */ + int disconnected; /** are we connecting */ int connecting; - /** things waiting for a connection */ - waiting_for_connect *waiting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; @@ -138,7 +129,7 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - connection *connection; + grpc_connected_subchannel *connection; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -146,27 +137,10 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) -static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con, - grpc_pollset *pollset); -static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - const char *reason); -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, int iomgr_success); -static void subchannel_ref_locked(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static int subchannel_unref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static grpc_subchannel *connection_unref_locked( - grpc_exec_ctx *exec_ctx, - connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); - #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ subchannel_ref_locked((p), __FILE__, __LINE__, (r)) @@ -203,66 +177,35 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); * connection implementation */ -static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) { - GPR_ASSERT(c->refs == 0); +static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -static void connection_ref_locked(connection *c +void grpc_connected_subchannel_ref(grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("CONNECTION", c); - subchannel_ref_locked(c->subchannel REF_PASS_ARGS); - ++c->refs; + gpr_ref(&c->refs); } -static grpc_subchannel *connection_unref_locked( - grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - grpc_subchannel *destroy = NULL; +void grpc_connected_subchannel_unref( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { UNREF_LOG("CONNECTION", c); - if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { - destroy = c->subchannel; - } - if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(exec_ctx, c); + if (gpr_unref(&c->refs)) { + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1); } - return destroy; } /* * grpc_subchannel implementation */ -static void subchannel_ref_locked(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("SUBCHANNEL", c); - ++c->refs; -} - -static int subchannel_unref_locked(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("SUBCHANNEL", c); - return --c->refs == 0; -} - -void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_mu_lock(&c->mu); - subchannel_ref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); -} - -void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - int destroy; - gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(exec_ctx, c); -} - -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - if (c->active != NULL) { - connection_destroy(exec_ctx, c->active); +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_subchannel *c = arg; + grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + if (con != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); } gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); @@ -273,6 +216,17 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(c); } +void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_ref(&c->refs); +} + +void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (gpr_unref(&c->refs)) { + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); + } +} + void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { @@ -295,7 +249,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_channel_element *parent_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); - c->refs = 1; + gpr_ref_init(&c->refs, 1); c->connector = connector; grpc_connector_ref(c->connector); c->num_filters = args->filter_count; @@ -318,60 +272,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success) { - waiting_for_connect *w4c; - gpr_mu_lock(&subchannel->mu); - w4c = subchannel->waiting; - subchannel->waiting = NULL; - gpr_mu_unlock(&subchannel->mu); - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, - w4c->pollset); - if (w4c->notify) { - w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); - } - - GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); - gpr_free(w4c); - - w4c = next; - } -} - -void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - gpr_atm *target) { - waiting_for_connect *w4c; - int unref_count = 0; - gpr_mu_lock(&subchannel->mu); - w4c = subchannel->waiting; - subchannel->waiting = NULL; - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - if (w4c->target == target) { - grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, - w4c->pollset); - grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0); - - unref_count++; - gpr_free(w4c); - } else { - w4c->next = subchannel->waiting; - subchannel->waiting = w4c; - } - - w4c = next; - } - gpr_mu_unlock(&subchannel->mu); - - while (unref_count-- > 0) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect"); - } -} - static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; @@ -381,6 +281,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.deadline = compute_connect_deadline(c); args.channel_args = c->args; + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change"); grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } @@ -393,66 +294,6 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { continue_connect(exec_ctx, c); } -static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - int call_creation_finished_ok; - waiting_for_connect *w4c = arg; - grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - call_creation_finished_ok = grpc_subchannel_create_call( - exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_finished_ok == 1); - w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); - GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); - gpr_free(w4c); -} - -int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset *pollset, gpr_atm *target, - grpc_closure *notify) { - connection *con; - grpc_subchannel_call *call; - GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0); - gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "call"); - gpr_mu_unlock(&c->mu); - - call = create_call(exec_ctx, con, pollset); - if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) { - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set"); - } - GPR_TIMER_END("grpc_subchannel_create_call", 0); - return 1; - } else { - waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); - w4c->next = c->waiting; - w4c->notify = notify; - w4c->pollset = pollset; - w4c->target = target; - w4c->subchannel = c; - /* released when clearing w4c */ - SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); - grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); - c->waiting = w4c; - grpc_subchannel_add_interested_party(exec_ctx, c, pollset); - if (!c->connecting) { - c->connecting = 1; - connectivity_state_changed_locked(exec_ctx, c, "create_call"); - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - gpr_mu_unlock(&c->mu); - - start_connect(exec_ctx, c); - } else { - gpr_mu_unlock(&c->mu); - } - GPR_TIMER_END("grpc_subchannel_create_call", 0); - return 0; - } -} - grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); @@ -472,9 +313,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, do_connect = 1; c->connecting = 1; /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); + GRPC_SUBCHANNEL_REF(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(exec_ctx, c, "state_change"); } gpr_mu_unlock(&c->mu); @@ -483,31 +323,28 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, } } -int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, +void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_closure *subscribed_notify) { - int success; gpr_mu_lock(&c->mu); - success = grpc_connectivity_state_change_unsubscribe( + grpc_connectivity_state_change_unsubscribe( exec_ctx, &c->state_tracker, subscribed_notify); gpr_mu_unlock(&c->mu); - return success; } void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_transport_op *op) { - connection *con = NULL; - grpc_subchannel *destroy; + grpc_connected_subchannel *con; int cancel_alarm = 0; gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "transport-op"); + con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + if (con != NULL) { + GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op"); } if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked(exec_ctx, c, "disconnect"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -515,17 +352,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&c->mu); if (con != NULL) { - grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(exec_ctx, top_elem, op); - - gpr_mu_lock(&c->mu); - destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op"); - gpr_mu_unlock(&c->mu); - if (destroy) { - subchannel_destroy(exec_ctx, destroy); - } + grpc_connected_subchannel_process_transport_op(exec_ctx, con, op); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op"); } if (cancel_alarm) { @@ -537,77 +365,62 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } } -static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p, +void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { + grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_element *top_elem = + grpc_channel_stack_element(channel_stack, 0); + top_elem->filter->start_transport_op(exec_ctx, top_elem, op); +} + +static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { state_watcher *sw = p; - grpc_subchannel *c = sw->subchannel; + grpc_subchannel *c = sw->whom.subchannel; gpr_mu *mu = &c->mu; - int destroy; - grpc_transport_op op; - grpc_channel_element *elem; - connection *destroy_connection = NULL; gpr_mu_lock(mu); - /* if we failed or there is a version number mismatch, just leave - this closure */ - if (!iomgr_success || sw->subchannel->active_version != sw->version) { - goto done; - } - - switch (sw->connectivity_state) { - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_READY: - case GRPC_CHANNEL_IDLE: - /* all is still good: keep watching */ - memset(&op, 0, sizeof(op)); - op.connectivity_state = &sw->connectivity_state; - op.on_connectivity_state_change = &sw->closure; - elem = grpc_channel_stack_element( - CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); - /* early out */ - gpr_mu_unlock(mu); - return; - case GRPC_CHANNEL_FATAL_FAILURE: - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* things have gone wrong, deactivate and enter idle */ - if (sw->subchannel->active->refs == 0) { - destroy_connection = sw->subchannel->active; - } - sw->subchannel->active = NULL; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - c->disconnected - ? GRPC_CHANNEL_FATAL_FAILURE - : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed"); - break; + /* if we failed just leave this closure */ + if (iomgr_success) { + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child"); + if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { + grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure); + GRPC_SUBCHANNEL_REF(c, "state_watcher"); + sw = NULL; + } } -done: - connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed"); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); - gpr_free(sw); gpr_mu_unlock(mu); - if (destroy) { - subchannel_destroy(exec_ctx, c); - } - if (destroy_connection != NULL) { - connection_destroy(exec_ctx, destroy_connection); - } + GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "state_watcher"); + gpr_free(sw); +} + +static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { + grpc_transport_op op; + grpc_channel_element *elem; + memset(&op, 0, sizeof(op)); + op.connectivity_state = state; + op.on_connectivity_state_change = closure; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); + elem->filter->start_transport_op(exec_ctx, elem, &op); +} + +void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { + GPR_ASSERT(state != NULL); + connected_subchannel_state_op(exec_ctx, con, state, closure); +} + +void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { + connected_subchannel_state_op(exec_ctx, con, NULL, closure); } static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; - connection *con; + grpc_connected_subchannel *con; grpc_channel_stack *stk; size_t num_filters; const grpc_channel_filter **filters; - waiting_for_connect *w4c; - grpc_transport_op op; - state_watcher *sw; - connection *destroy_connection = NULL; - grpc_channel_element *elem; + state_watcher *sw_subchannel; /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; @@ -619,10 +432,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* construct channel stack */ channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(connection) + channel_stack_size); + con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - con->refs = 0; - con->subchannel = c; + gpr_ref_init(&c->refs, 1); grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, c->mdctx, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); @@ -630,16 +442,16 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw = gpr_malloc(sizeof(*sw)); - grpc_closure_init(&sw->closure, on_state_changed, sw); - sw->subchannel = c; - sw->connectivity_state = GRPC_CHANNEL_READY; + sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); + sw_subchannel->whom.subchannel = c; + sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; + grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); gpr_mu_lock(&c->mu); if (c->disconnected) { gpr_mu_unlock(&c->mu); - gpr_free(sw); + gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); @@ -648,45 +460,35 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } /* publish */ - if (c->active != NULL && c->active->refs == 0) { - destroy_connection = c->active; - } - c->active = con; - c->active_version++; - sw->version = c->active_version; + GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; - /* watch for changes; subchannel ref for connecting is donated + /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ + GRPC_SUBCHANNEL_REF(c, "state_watcher"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); + +#if 0 + grpc_transport_op op; + grpc_channel_element *elem; + + /* setup connected subchannel watching transport for changes */ memset(&op, 0, sizeof(op)); - op.connectivity_state = &sw->connectivity_state; - op.on_connectivity_state_change = &sw->closure; + op.connectivity_state = &sw_connected_subchannel->connectivity_state; + op.on_connectivity_state_change = &sw_connected_subchannel->closure; op.bind_pollset_set = c->pollset_set; - SUBCHANNEL_REF_LOCKED(c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); - GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); + grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem->filter->start_transport_op(exec_ctx, elem, &op); +#endif /* signal completion */ - connectivity_state_changed_locked(exec_ctx, c, "connected"); - w4c = c->waiting; - c->waiting = NULL; + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); gpr_mu_unlock(&c->mu); - - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1); - w4c = next; - } - gpr_free((void *)filters); - - if (destroy_connection != NULL) { - connection_destroy(exec_ctx, destroy_connection); - } + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); } /* Generate a random number between 0 and 1. */ @@ -725,13 +527,11 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(exec_ctx, c, "alarm"); gpr_mu_unlock(&c->mu); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - cancel_waiting_calls(exec_ctx, c, iomgr_success); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } @@ -742,12 +542,14 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { publish_transport(exec_ctx, c); + } else if (c->disconnected) { + /* do nothing */ } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(exec_ctx, c, "connect_failed"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } @@ -764,29 +566,6 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { : min_deadline; } -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { - if (c->disconnected) { - return GRPC_CHANNEL_FATAL_FAILURE; - } - if (c->connecting) { - if (c->have_alarm) { - return GRPC_CHANNEL_TRANSIENT_FAILURE; - } - return GRPC_CHANNEL_CONNECTING; - } - if (c->active) { - return GRPC_CHANNEL_READY; - } - return GRPC_CHANNEL_IDLE; -} - -static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - const char *reason) { - grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason); -} - /* * grpc_subchannel_call implementation */ @@ -794,17 +573,9 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, int success) { grpc_subchannel_call *c = call; - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); - gpr_mu_unlock(mu); gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(exec_ctx, destroy); - } GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -842,8 +613,12 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); } -static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con, +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) { + return GET_CONNECTED_SUBCHANNEL(c, acq); +} + +grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *con, grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 1fefa1888a..d5305f1a50 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -41,6 +41,7 @@ /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; +typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; @@ -49,6 +50,10 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) +#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ + grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ + grpc_connected_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \ @@ -58,6 +63,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) +#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) grpc_connected_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \ grpc_subchannel_call_unref((cl), (p)) @@ -69,33 +76,29 @@ void grpc_subchannel_ref(grpc_subchannel *channel void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a subchannel call (possibly asynchronously). - * - * If the returned status is 1, the call will return immediately and \a target - * will point to a connected \a subchannel_call instance. Note that \a notify - * will \em not be invoked in this case. - * Otherwise, if the returned status is 0, the subchannel call will be created - * asynchronously, invoking the \a notify callback upon completion. */ -int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_pollset *pollset, gpr_atm *target, - grpc_closure *notify); - -/** cancel \a call in the waiting state. */ -void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - gpr_atm *target); +/** construct a subchannel call */ +grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *connected_subchannel, + grpc_pollset *pollset); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_transport_op *op); +void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *subchannel, + grpc_transport_op *op); /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( @@ -107,13 +110,19 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); +void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *channel, + grpc_connectivity_state *state, + grpc_closure *notify); /** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present, returning 1. Otherwise, nothing is done and return - * 0. */ -int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, + * state change if present. */ +void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_closure *subscribed_notify); +void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *channel, + grpc_closure *subscribed_notify); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, @@ -124,6 +133,10 @@ void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_pollset *pollset); +/** retrieve the grpc_connected_subchannel - or NULL if called before + the subchannel becomes connected */ +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *subchannel); + /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f296ce8251..f1059801d4 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -96,7 +96,7 @@ typedef struct grpc_transport_stream_op { typedef struct grpc_transport_op { /** called when processing of this op is done */ grpc_closure *on_consumed; - /** connectivity monitoring */ + /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ |