diff options
author | 2015-09-18 17:29:00 -0700 | |
---|---|---|
committer | 2015-09-18 17:29:00 -0700 | |
commit | d1bec03fa148344b8eac2b59517252d86e4ca858 (patch) | |
tree | f359e48f9151ab7ceff72cd624ad6c7a59e4d304 /src/core/client_config/lb_policies/pick_first.c | |
parent | 33825118df7157219cec15382beb006d3462ad96 (diff) |
Call list progress
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 86 |
1 files changed, 39 insertions, 47 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 852eed310d..6dc52f43ce 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -52,8 +52,6 @@ typedef struct { /** all our subchannels */ grpc_subchannel **subchannels; size_t num_subchannels; - /** workqueue for async work */ - grpc_workqueue *workqueue; grpc_closure connectivity_changed; @@ -78,33 +76,34 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(pick_first_lb_policy *p) { +static void del_interested_parties_locked(pick_first_lb_policy *p, + grpc_call_list *call_list) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); + pp->pollset, call_list); } } -static void add_interested_parties_locked(pick_first_lb_policy *p) { +static void add_interested_parties_locked(pick_first_lb_policy *p, + grpc_call_list *call_list) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); + pp->pollset, call_list); } } -void pf_destroy(grpc_lb_policy *pol) { +void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; - del_interested_parties_locked(p); + GPR_ASSERT(p->shutdown); for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list); } grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); - GRPC_WORKQUEUE_UNREF(p->workqueue, "pick_first"); gpr_free(p); } @@ -112,7 +111,7 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(p); + del_interested_parties_locked(p, call_list); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -156,13 +155,13 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, if (p->selected) { gpr_mu_unlock(&p->mu); *target = p->selected; - on_complete->cb(on_complete->cb_arg, 1); + grpc_call_list_add(call_list, on_complete, 1); } else { if (!p->started_picking) { start_picking(p, call_list); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pollset); + pollset, call_list); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -173,58 +172,58 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, } } -static void pf_connectivity_changed(void *arg, int iomgr_success) { +static void pf_connectivity_changed(void *arg, int iomgr_success, + grpc_call_list *call_list) { pick_first_lb_policy *p = arg; pending_pick *pp; - int unref = 0; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&p->mu); if (p->shutdown) { - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed", &call_list); + "selected_changed", call_list); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_subchannel_notify_on_state_change( p->selected, &p->checking_connectivity, &p->connectivity_changed, - &call_list); + call_list); } else { - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } } else { loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", &call_list); + "connecting_ready", call_list); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_call_list_add(&call_list, pp->on_complete, 1); + grpc_subchannel_del_interested_party(p->selected, pp->pollset, + call_list); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( p->selected, &p->checking_connectivity, &p->connectivity_changed, - &call_list); + call_list); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", &call_list); - del_interested_parties_locked(p); + "connecting_transient_failure", call_list); + del_interested_parties_locked(p, call_list); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p); + add_interested_parties_locked(p, call_list); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, &call_list); + &p->connectivity_changed, call_list); } else { goto loop; } @@ -232,48 +231,43 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "connecting_changed", &call_list); + "connecting_changed", call_list); grpc_subchannel_notify_on_state_change( p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, &call_list); + &p->connectivity_changed, call_list); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p); + del_interested_parties_locked(p, call_list); GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first", + call_list); if (p->num_subchannels == 0) { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", &call_list); + "no_more_channels", call_list); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); } - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } else { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", &call_list); + "subchannel_failed", call_list); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p); + add_interested_parties_locked(p, call_list); goto loop; } } } gpr_mu_unlock(&p->mu); - - grpc_call_list_run(call_list); - - if (unref) { - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); - } } static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, @@ -293,8 +287,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast"); + grpc_subchannel_process_transport_op(subchannels[i], op, call_list); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast", call_list); } gpr_free(subchannels); } @@ -341,8 +335,6 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - p->workqueue = args->workqueue; - GRPC_WORKQUEUE_REF(p->workqueue, "pick_first"); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, |