diff options
author | 2015-09-18 07:20:29 -0700 | |
---|---|---|
committer | 2015-09-18 07:20:29 -0700 | |
commit | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch) | |
tree | 883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/client_config/lb_policies/pick_first.c | |
parent | 38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff) |
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 127 |
1 files changed, 44 insertions, 83 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 804dbdeadd..8fd8dd7b67 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -108,9 +108,8 @@ void pf_destroy(grpc_lb_policy *pol) { gpr_free(p); } -void pf_shutdown(grpc_lb_policy *pol) { +void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_flusher f; pending_pick *pp; gpr_mu_lock(&p->mu); del_interested_parties_locked(p); @@ -118,51 +117,40 @@ void pf_shutdown(grpc_lb_policy *pol) { pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown"); - grpc_connectivity_state_begin_flush(&p->state_tracker, &f); + "shutdown", call_list); gpr_mu_unlock(&p->mu); - grpc_connectivity_state_end_flush(&f); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - pp->on_complete->cb(pp->on_complete->cb_arg, 0); + grpc_iomgr_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); pp = next; } } -/* returns a closure to call, or NULL */ -static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) { +static void start_picking(pick_first_lb_policy *p, + grpc_iomgr_call_list *call_list) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed) - .state_already_changed) { - return &p->connectivity_changed; - } else { - return NULL; - } + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], + &p->checking_connectivity, + &p->connectivity_changed, call_list); } -void pf_exit_idle(grpc_lb_policy *pol) { +void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_iomgr_closure *call = NULL; gpr_mu_lock(&p->mu); if (!p->started_picking) { - call = start_picking(p); + start_picking(p, call_list); } gpr_mu_unlock(&p->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete) { + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -171,9 +159,8 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, *target = p->selected; on_complete->cb(on_complete->cb_arg, 1); } else { - grpc_iomgr_closure *call = NULL; if (!p->started_picking) { - call = start_picking(p); + start_picking(p, call_list); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -184,9 +171,6 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } } @@ -194,8 +178,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; - grpc_iomgr_closure *cbs = NULL; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&p->mu); @@ -203,14 +186,11 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { unref = 1; } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed"); + "selected_changed", &call_list); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - if (grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed, + &call_list); } else { unref = 1; } @@ -219,28 +199,23 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready"); + "connecting_ready", &call_list); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - pp->on_complete->next = cbs; - cbs = pp->on_complete; + grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } - if (grpc_subchannel_notify_on_state_change(p->selected, - &p->checking_connectivity, - &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed, + &call_list); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); + "connecting_transient_failure", &call_list); del_interested_parties_locked(p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; @@ -248,13 +223,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], - &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed, &call_list); } else { goto loop; } @@ -262,14 +233,10 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "connecting_changed"); - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], - &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + "connecting_changed", &call_list); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed, &call_list); break; case GRPC_CHANNEL_FATAL_FAILURE: del_interested_parties_locked(p); @@ -280,19 +247,18 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { if (p->num_subchannels == 0) { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels"); + "no_more_channels", &call_list); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - pp->on_complete->next = cbs; - cbs = pp->on_complete; + grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } unref = 1; } else { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed"); + "subchannel_failed", &call_list); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); @@ -302,22 +268,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } } - grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); - grpc_connectivity_state_end_flush(&f); - while (cbs != NULL) { - grpc_iomgr_closure *next = cbs->next; - cbs->cb(cbs->cb_arg, 1); - cbs = next; - } + grpc_iomgr_call_list_run(call_list); if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); } } -static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { +static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, + grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; @@ -339,7 +300,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { gpr_free(subchannels); } -static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { +static grpc_connectivity_state pf_check_connectivity( + grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); @@ -348,16 +310,15 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { return st; } -static grpc_connectivity_state_notify_on_state_change_result -pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, - grpc_iomgr_closure *notify) { +void pf_notify_on_state_change(grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&p->mu); - r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify, call_list); gpr_mu_unlock(&p->mu); - return r; } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { |