aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
commit86c99580a0891697f3c5227ae2fd2911734098fc (patch)
treebdb4a2bf6b12e45d504ac622968b979ab4f3dec4 /src/core/client_config
parentee7531cd7c0ef681d104ab0ec83a8fb151ebb62b (diff)
Load balancing interest management fixes
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c40
-rw-r--r--src/core/client_config/lb_policies/round_robin.c49
-rw-r--r--src/core/client_config/lb_policy.c1
-rw-r--r--src/core/client_config/lb_policy.h1
-rw-r--r--src/core/client_config/subchannel.c38
-rw-r--r--src/core/client_config/subchannel.h10
6 files changed, 51 insertions, 88 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 4ecfc11cdd..c0f1d3fd94 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,24 +76,6 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
-static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
@@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- del_interested_parties_locked(exec_ctx, p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
pp = next;
@@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
}
@@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
@@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
- del_interested_parties_locked(exec_ctx, p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
goto loop;
@@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
@@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
goto loop;
}
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index ca0d6abd07..10688b3fa5 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -200,24 +200,11 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- round_robin_lb_policy *p,
- const size_t subchannel_idx) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[subchannel_idx], pp->pollset);
- }
-}
-
void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
- }
- for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin");
}
gpr_free(p->connectivity_changed_cbs);
@@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
- }
-
p->shutdown = 1;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- size_t i;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pp->pollset);
- }
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
for (i = 0; i < p->num_subchannels; i++) {
p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i],
+ &p->base.interested_parties,
&p->subchannel_connectivity[i],
&p->connectivity_changed_cbs[i]);
GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity");
@@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
- size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
@@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pollset);
- }
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx,
+ p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_CONNECTING:
@@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
*this_connectivity, "connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx, p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
/* renew state notification */
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx, p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
/* remove from ready list if still present */
@@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(
p, p->subchannel_index_to_readylist_node[this_idx]);
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 5605f788a5..8d9287c36a 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
gpr_ref_init(&policy->refs, 1);
+ grpc_pollset_set_init(&policy->interested_parties);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 2889b8e55d..985c96630f 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_refcount refs;
+ grpc_pollset_set interested_parties;
};
struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 16f9346a35..3872cacfa0 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset);
-}
-
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset);
-}
-
static gpr_uint32 random_seed() {
return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
}
@@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
+typedef struct {
+ grpc_subchannel *subchannel;
+ grpc_pollset_set *pollset_set;
+ grpc_closure *notify;
+ grpc_closure closure;
+} external_state_watcher;
+
+static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ external_state_watcher *w = arg;
+ grpc_closure *follow_up = w->notify;
+ grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+}
+
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
+ grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify) {
int do_connect = 0;
+ external_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->subchannel = c;
+ w->pollset_set = interested_parties;
+ w->notify = notify;
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties);
+ GRPC_SUBCHANNEL_REF(c, "external_state_watcher");
gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, notify)) {
+ exec_ctx, &c->state_tracker, state, &w->closure)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b50d1e8ecc..20d74e9f10 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
Updates *state with the new state of the channel */
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
+ grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_connected_subchannel_notify_on_state_change(
@@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_closure *subscribed_notify);
-/** express interest in \a channel's activities through \a pollset. */
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
-/** stop following \a channel's activity through \a pollset. */
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
-
/** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(