aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c192
1 files changed, 128 insertions, 64 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 73da624aff..5ae2e0ea52 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -62,6 +62,8 @@ typedef struct {
grpc_subchannel *selected;
/** have we started picking? */
int started_picking;
+ /** are we shut down? */
+ int shutdown;
/** which subchannel are we watching? */
size_t checking_subchannel;
/** what is the connectivity of that channel? */
@@ -73,12 +75,30 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
+static void del_interested_parties_locked(pick_first_lb_policy *p) {
+ pending_pick *pp;
+ for (pp = p->pending_picks; pp; pp = pp->next) {
+ grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
+ pp->pollset);
+ }
+}
+
+static void add_interested_parties_locked(pick_first_lb_policy *p) {
+ pending_pick *pp;
+ for (pp = p->pending_picks; pp; pp = pp->next) {
+ grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
+ pp->pollset);
+ }
+}
+
void pf_destroy(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
+ del_interested_parties_locked(p);
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
}
+ grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
gpr_free(p);
@@ -88,12 +108,35 @@ void pf_shutdown(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
+ del_interested_parties_locked(p);
+ p->shutdown = 1;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
gpr_free(pp);
}
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+ "shutdown");
+ gpr_mu_unlock(&p->mu);
+}
+
+static void start_picking(pick_first_lb_policy *p) {
+ p->started_picking = 1;
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
+ grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity,
+ &p->connectivity_changed);
+}
+
+void pf_exit_idle(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ gpr_mu_lock(&p->mu);
+ if (!p->started_picking) {
+ start_picking(p);
+ }
gpr_mu_unlock(&p->mu);
}
@@ -109,13 +152,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
on_complete->cb(on_complete->cb_arg, 1);
} else {
if (!p->started_picking) {
- p->started_picking = 1;
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ start_picking(p);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -129,77 +166,97 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
}
}
-static void del_interested_parties_locked(pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
- pp->pollset);
- }
-}
-
-static void add_interested_parties_locked(pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
- pp->pollset);
- }
-}
-
static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
gpr_mu_lock(&p->mu);
-loop:
- switch (p->checking_connectivity) {
- case GRPC_CHANNEL_READY:
- p->selected = p->subchannels[p->checking_subchannel];
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = p->selected;
- grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
- gpr_free(pp);
- }
- unref = 1;
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- del_interested_parties_locked(p);
- p->checking_subchannel =
- (p->checking_subchannel + 1) % p->num_subchannels;
- p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(p);
- goto loop;
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:
+
+ if (p->shutdown) {
+ unref = 1;
+ } else if (p->selected != NULL) {
+ grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
+ "selected_changed");
+ if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
- break;
- case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels - 1]);
- p->num_subchannels--;
- GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
- if (p->num_subchannels == 0) {
+ p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ } else {
+ unref = 1;
+ }
+ } else {
+ loop:
+ switch (p->checking_connectivity) {
+ case GRPC_CHANNEL_READY:
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+ "connecting_ready");
+ p->selected = p->subchannels[p->checking_subchannel];
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = NULL;
+ *pp->target = p->selected;
+ grpc_subchannel_del_interested_party(p->selected, pp->pollset);
grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
gpr_free(pp);
}
- unref = 1;
- } else {
- p->checking_subchannel %= p->num_subchannels;
+ grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ break;
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ grpc_connectivity_state_set(&p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ del_interested_parties_locked(p);
+ p->checking_subchannel =
+ (p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
- goto loop;
- }
+ if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed);
+ } else {
+ goto loop;
+ }
+ break;
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:
+ grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
+ "connecting_changed");
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed);
+ break;
+ case GRPC_CHANNEL_FATAL_FAILURE:
+ del_interested_parties_locked(p);
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels - 1]);
+ p->num_subchannels--;
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
+ if (p->num_subchannels == 0) {
+ grpc_connectivity_state_set(&p->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE,
+ "no_more_channels");
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = NULL;
+ grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+ gpr_free(pp);
+ }
+ unref = 1;
+ } else {
+ grpc_connectivity_state_set(&p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "subchannel_failed");
+ p->checking_subchannel %= p->num_subchannels;
+ p->checking_connectivity = grpc_subchannel_check_connectivity(
+ p->subchannels[p->checking_subchannel]);
+ add_interested_parties_locked(p);
+ goto loop;
+ }
+ }
}
+
gpr_mu_unlock(&p->mu);
if (unref) {
@@ -249,8 +306,13 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy, pf_shutdown, pf_pick,
- pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_destroy,
+ pf_shutdown,
+ pf_pick,
+ pf_exit_idle,
+ pf_broadcast,
+ pf_check_connectivity,
+ pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {
@@ -260,6 +322,8 @@ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
p->num_subchannels = num_subchannels;
+ grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ "pick_first");
memcpy(p->subchannels, subchannels,
sizeof(grpc_subchannel *) * num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);