aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
commit86c99580a0891697f3c5227ae2fd2911734098fc (patch)
treebdb4a2bf6b12e45d504ac622968b979ab4f3dec4 /src/core/client_config/lb_policies/pick_first.c
parentee7531cd7c0ef681d104ab0ec83a8fb151ebb62b (diff)
Load balancing interest management fixes
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c40
1 files changed, 9 insertions, 31 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 4ecfc11cdd..c0f1d3fd94 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,24 +76,6 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
-static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
@@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- del_interested_parties_locked(exec_ctx, p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
pp = next;
@@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
}
@@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
@@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
- del_interested_parties_locked(exec_ctx, p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
goto loop;
@@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
@@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
goto loop;
}
}