diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-17 15:27:13 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-17 15:27:13 -0700 |
commit | 5795da7c96642f3e19c8cfe105de7d1cc7cf6fbc (patch) | |
tree | b27597449f26049270b754ba8f5b010d47e0f998 /src/core/client_config/lb_policies | |
parent | 2d2711f9e6fd3ac783f400fcd4e72798636f194e (diff) |
Update connectivity state code to be completely synchronous
Diffstat (limited to 'src/core/client_config/lb_policies')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 113 |
1 files changed, 82 insertions, 31 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 575ee02249..804dbdeadd 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -110,38 +110,54 @@ void pf_destroy(grpc_lb_policy *pol) { void pf_shutdown(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_flusher f; pending_pick *pp; gpr_mu_lock(&p->mu); del_interested_parties_locked(p); p->shutdown = 1; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_workqueue_push(p->workqueue, pp->on_complete, 0); - gpr_free(pp); - } + pp = p->pending_picks; + p->pending_picks = NULL; grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); + grpc_connectivity_state_end_flush(&f); + while (pp != NULL) { + pending_pick *next = pp->next; + *pp->target = NULL; + pp->on_complete->cb(pp->on_complete->cb_arg, 0); + gpr_free(pp); + pp = next; + } } -static void start_picking(pick_first_lb_policy *p) { +/* returns a closure to call, or NULL */ +static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], - &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed) + .state_already_changed) { + return &p->connectivity_changed; + } else { + return NULL; + } } void pf_exit_idle(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&p->mu); if (!p->started_picking) { - start_picking(p); + call = start_picking(p); } gpr_mu_unlock(&p->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, @@ -155,8 +171,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, *target = p->selected; on_complete->cb(on_complete->cb_arg, 1); } else { + grpc_iomgr_closure *call = NULL; if (!p->started_picking) { - start_picking(p); + call = start_picking(p); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -167,6 +184,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } } @@ -174,6 +194,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; + grpc_iomgr_closure *cbs = NULL; + grpc_connectivity_state_flusher f; gpr_mu_lock(&p->mu); @@ -183,8 +205,12 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } } else { unref = 1; } @@ -199,11 +225,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_workqueue_push(p->workqueue, pp->on_complete, 1); + pp->on_complete->next = cbs; + cbs = pp->on_complete; gpr_free(pp); } - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change(p->selected, + &p->checking_connectivity, + &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, @@ -216,9 +248,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } } else { goto loop; } @@ -227,9 +263,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, "connecting_changed"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } break; case GRPC_CHANNEL_FATAL_FAILURE: del_interested_parties_locked(p); @@ -244,7 +284,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_workqueue_push(p->workqueue, pp->on_complete, 1); + pp->on_complete->next = cbs; + cbs = pp->on_complete; gpr_free(pp); } unref = 1; @@ -261,7 +302,15 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } } + grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); + grpc_connectivity_state_end_flush(&f); + + while (cbs != NULL) { + grpc_iomgr_closure *next = cbs->next; + cbs->cb(cbs->cb_arg, 1); + cbs = next; + } if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); @@ -299,14 +348,16 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { return st; } -static void pf_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_iomgr_closure *notify) { +static grpc_connectivity_state_notify_on_state_change_result +pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify); + r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); gpr_mu_unlock(&p->mu); + return r; } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { @@ -332,8 +383,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, p->num_subchannels = args->num_subchannels; p->workqueue = args->workqueue; GRPC_WORKQUEUE_REF(p->workqueue, "pick_first"); - grpc_connectivity_state_init(&p->state_tracker, args->workqueue, - GRPC_CHANNEL_IDLE, "pick_first"); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "pick_first"); memcpy(p->subchannels, args->subchannels, sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); |