From 86c99580a0891697f3c5227ae2fd2911734098fc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 25 Nov 2015 15:22:26 -0800 Subject: Load balancing interest management fixes --- src/core/client_config/lb_policies/pick_first.c | 40 +++++-------------- src/core/client_config/lb_policies/round_robin.c | 49 +++++++----------------- src/core/client_config/lb_policy.c | 1 + src/core/client_config/lb_policy.h | 1 + src/core/client_config/subchannel.c | 38 +++++++++++------- src/core/client_config/subchannel.h | 10 +---- 6 files changed, 51 insertions(+), 88 deletions(-) (limited to 'src/core/client_config') diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4ecfc11cdd..c0f1d3fd94 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,24 +76,6 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - -static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; @@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - del_interested_parties_locked(exec_ctx, p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels]); GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { @@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); goto loop; } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index ca0d6abd07..10688b3fa5 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -200,23 +200,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p, - const size_t subchannel_idx) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } -} - void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); } @@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } - p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - size_t i; gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pp->pollset); - } + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { for (i = 0; i < p->num_subchannels; i++) { p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], + &p->base.interested_parties, &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); @@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; @@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pollset); - } + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, + p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_CONNECTING: @@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); /* remove from ready list if still present */ @@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked( p, p->subchannel_index_to_readylist_node[this_idx]); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5605f788a5..8d9287c36a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_ref_init(&policy->refs, 1); + grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 2889b8e55d..985c96630f 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_refcount refs; + grpc_pollset_set interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 16f9346a35..3872cacfa0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } } -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); -} - -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); -} - static gpr_uint32 random_seed() { return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } @@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } +typedef struct { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; +} external_state_watcher; + +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + external_state_watcher *w = arg; + grpc_closure *follow_up = w->notify; + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, success); +} + void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; + external_state_watcher *w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, notify)) { + exec_ctx, &c->state_tracker, state, &w->closure)) { do_connect = 1; c->connecting = 1; /* released by connection */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b50d1e8ecc..20d74e9f10 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( @@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_closure *subscribed_notify); -/** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); -/** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( -- cgit v1.2.3