aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c113
1 files changed, 82 insertions, 31 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 575ee02249..804dbdeadd 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -110,38 +110,54 @@ void pf_destroy(grpc_lb_policy *pol) {
void pf_shutdown(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
p->shutdown = 1;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 0);
- gpr_free(pp);
- }
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
"shutdown");
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ *pp->target = NULL;
+ pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+ gpr_free(pp);
+ pp = next;
+ }
}
-static void start_picking(pick_first_lb_policy *p) {
+/* returns a closure to call, or NULL */
+static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
- &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ return &p->connectivity_changed;
+ } else {
+ return NULL;
+ }
}
void pf_exit_idle(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
@@ -155,8 +171,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
+ grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -167,6 +184,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
}
@@ -174,6 +194,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
+ grpc_iomgr_closure *cbs = NULL;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&p->mu);
@@ -183,8 +205,12 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
unref = 1;
}
@@ -199,11 +225,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(p->selected,
+ &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
@@ -216,9 +248,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
goto loop;
}
@@ -227,9 +263,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"connecting_changed");
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@@ -244,7 +284,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
unref = 1;
@@ -261,7 +302,15 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+
+ while (cbs != NULL) {
+ grpc_iomgr_closure *next = cbs->next;
+ cbs->cb(cbs->cb_arg, 1);
+ cbs = next;
+ }
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
@@ -299,14 +348,16 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static void pf_notify_on_state_change(grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
+static grpc_connectivity_state_notify_on_state_change_result
+pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
- notify);
+ r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
gpr_mu_unlock(&p->mu);
+ return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
@@ -332,8 +383,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
GRPC_WORKQUEUE_REF(p->workqueue, "pick_first");
- grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
- GRPC_CHANNEL_IDLE, "pick_first");
+ grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ "pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);