aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2016-11-15 15:16:47 -0800
committerGravatar GitHub <noreply@github.com>2016-11-15 15:16:47 -0800
commita1cf4dd6074f62ef5bc2e71d98c0f1f162dfedbb (patch)
treeb86882f8e0115f142aac119a238c3899edbbabff /src/core/ext
parent873d575c7d7b81ea9b149174fb81ba4f61ae2b98 (diff)
parentffd8e3d8e1397bcae315185818cd9d4784092682 (diff)
Merge pull request #8666 from dgquintas/rr_fixall
Fixed wrong connectivity status updates for RR
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c313
2 files changed, 197 insertions, 118 deletions
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index ac3c6a305a..39f3b3d60f 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -292,6 +292,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
} else {
loop:
switch (p->checking_connectivity) {
+ case GRPC_CHANNEL_INIT:
+ GPR_UNREACHABLE_CODE();
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 0fd3abe099..acb2f77e45 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -116,8 +116,13 @@ typedef struct {
grpc_closure connectivity_changed_closure;
/** this subchannels current position in subchannel->ready_list */
ready_list *ready_list_node;
- /** last observed connectivity */
- grpc_connectivity_state connectivity_state;
+ /** last observed connectivity. Not updated by
+ * \a grpc_subchannel_notify_on_state_change. Used to determine the previous
+ * state while processing the new state in \a rr_connectivity_changed */
+ grpc_connectivity_state prev_connectivity_state;
+ /** current connectivity state. Updated by \a
+ * grpc_subchannel_notify_on_state_change */
+ grpc_connectivity_state curr_connectivity_state;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
@@ -127,6 +132,7 @@ typedef struct {
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ gpr_mu mu;
/** total number of addresses received at creation time */
size_t num_addresses;
@@ -135,8 +141,11 @@ struct round_robin_lb_policy {
size_t num_subchannels;
subchannel_data **subchannels;
- /** mutex protecting remaining members */
- gpr_mu mu;
+ /** how many subchannels are in TRANSIENT_FAILURE */
+ size_t num_transient_failures;
+ /** how many subchannels are IDLE */
+ size_t num_idle;
+
/** have we started picking? */
int started_picking;
/** are we shutting down? */
@@ -258,6 +267,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
+static bool is_ready_list_empty(round_robin_lb_policy *p) {
+ return p->ready_list.prev == NULL;
+}
+
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *elem;
@@ -268,7 +281,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy");
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(sd->user_data);
@@ -381,18 +394,18 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
- if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
- p->num_subchannels);
- }
-
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
- sd->connectivity_state = GRPC_CHANNEL_IDLE;
+ /* use some sentinel value outside of the range of grpc_connectivity_state
+ * to signal an undefined previous state. We won't be referring to this
+ * value again and it'll be overwritten after the first call to
+ * rr_connectivity_changed */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
}
}
@@ -422,7 +435,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* readily available, report right away */
*target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
+ "rr_picked");
if (user_data != NULL) {
*user_data = selected->user_data;
@@ -453,125 +466,184 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
+static void update_state_counters(subchannel_data *sd) {
+ round_robin_lb_policy *p = sd->policy;
+
+ /* update p->num_transient_failures (resp. p->num_idle): if the previous
+ * state was TRANSIENT_FAILURE (resp. IDLE), decrement
+ * p->num_transient_failures (resp. p->num_idle). */
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(p->num_transient_failures > 0);
+ --p->num_transient_failures;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
+ GPR_ASSERT(p->num_idle > 0);
+ --p->num_idle;
+ }
+}
+
+/* sd is the subchannel_data associted with the updated subchannel.
+ * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
+ * or SHUTDOWN */
+static grpc_connectivity_state update_lb_connectivity_status(
+ grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
+ /* In priority order. The first rule to match terminates the search (ie, if we
+ * are on rule n, all previous rules were unfulfilled).
+ *
+ * 1) RULE: ANY subchannel is READY => policy is READY.
+ * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+ *
+ * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
+ * CHECK: sd->curr_connectivity_state == CONNECTING.
+ *
+ * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
+ * CHECK: p->num_subchannels = 0.
+ *
+ * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * TRANSIENT_FAILURE.
+ * CHECK: p->num_transient_failures == p->num_subchannels.
+ *
+ * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
+ * CHECK: p->num_idle == p->num_subchannels.
+ */
+ round_robin_lb_policy *p = sd->policy;
+ if (!is_ready_list_empty(p)) { /* 1) READY */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "rr_ready");
+ return GRPC_CHANNEL_READY;
+ } else if (sd->curr_connectivity_state ==
+ GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "rr_connecting");
+ return GRPC_CHANNEL_CONNECTING;
+ } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "rr_shutdown");
+ return GRPC_CHANNEL_SHUTDOWN;
+ } else if (p->num_transient_failures ==
+ p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "rr_transient_failure");
+ return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE, "rr_idle");
+ return GRPC_CHANNEL_IDLE;
+ }
+ /* no change */
+ return sd->curr_connectivity_state;
+}
+
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
- int unref = 0;
-
GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) {
- unref = 1;
- } else {
- switch (sd->connectivity_state) {
- case GRPC_CHANNEL_READY:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, GRPC_ERROR_REF(error),
- "connecting_ready");
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
- /* at this point we know there's at least one suitable subchannel. Go
- * ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
- if (p->pending_picks != NULL) {
- /* if the selected subchannel is going to be used for the pending
- * picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ gpr_mu_unlock(&p->mu);
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_INIT:
+ GPR_UNREACHABLE_CODE();
+ case GRPC_CHANNEL_READY:
+ /* add the newly connected subchannel to the list of connected ones.
+ * Note that it goes to the "end of the line". */
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
+ /* at this point we know there's at least one suitable subchannel. Go
+ * ahead and pick one and notify the pending suitors in
+ * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+ ready_list *selected = peek_next_connected_locked(p);
+ GPR_ASSERT(selected != NULL);
+ if (p->pending_picks != NULL) {
+ /* if the selected subchannel is going to be used for the pending
+ * picks, update the last picked pointer */
+ advance_last_picked_locked(p);
+ }
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected->subchannel),
+ "rr_picked");
+ if (pp->user_data != NULL) {
+ *pp->user_data = selected->user_data;
}
-
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG,
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
+ (void *)selected->subchannel, (void *)selected);
+ }
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ gpr_free(pp);
+ }
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_IDLE:
+ ++p->num_idle;
+ /* fallthrough */
+ case GRPC_CHANNEL_CONNECTING:
+ update_state_counters(sd);
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ ++p->num_transient_failures;
+ /* remove from ready list if still present */
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
+ }
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_SHUTDOWN:
+ update_state_counters(sd);
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
+ }
+ --p->num_subchannels;
+ GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
+ p->subchannels[p->num_subchannels]);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ p->subchannels[sd->index]->index = sd->index;
+ if (update_lb_connectivity_status(exec_ctx, sd, error) ==
+ GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
-
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
- if (pp->user_data != NULL) {
- *pp->user_data = selected->user_data;
- }
- if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
- }
+ *pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, sd->connectivity_state,
- GRPC_ERROR_REF(error), "connecting_changed");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- /* renew state notification */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
-
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
-
- p->num_subchannels--;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
- p->subchannels[sd->index]->index = sd->index;
- gpr_free(sd);
-
- unref = 1;
- if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted",
- &error, 1),
- "no_more_channels");
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
- NULL);
- gpr_free(pp);
- }
- } else {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- } /* switch */
- } /* !unref */
-
- gpr_mu_unlock(&p->mu);
-
- if (unref) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
+ }
+ gpr_free(sd);
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ break;
}
-
+ gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
@@ -607,9 +679,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
+ "rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
gpr_mu_unlock(&p->mu);
grpc_exec_ctx_sched(exec_ctx, closure,
@@ -705,6 +777,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
+
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
+ (void *)p, (unsigned long)p->num_subchannels);
+ }
gpr_mu_init(&p->mu);
return &p->base;
}