aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2017-07-07 14:30:30 -0700
committerGravatar GitHub <noreply@github.com>2017-07-07 14:30:30 -0700
commit05f6d7e5ae12a0eacc0dbdedcd964b2c379a8390 (patch)
tree044abc00bdc76870f8db0b30f957f66493c63a95 /src/core
parent4e014a241011356c8c03c45c86b88bf5f012d8d1 (diff)
parent4b2def361aa48b5b9bb7a8bf85a8cb638c70f4e7 (diff)
Merge pull request #11696 from dgquintas/rr_concurrent_updates_fix
Fix RR concurrent updates
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c54
1 files changed, 38 insertions, 16 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 8e9d6b0f47..56d340b8c2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -195,11 +195,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
+ if (subchannel_list->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
+ (void *)subchannel_list);
+ }
+ return;
+ };
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
+ (void *)subchannel_list);
+ }
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Unsubscribing from subchannel %p as part of shutting down "
+ "subchannel_list %p",
+ (void *)sd->subchannel, (void *)subchannel_list);
+ }
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
@@ -228,13 +245,14 @@ static size_t get_next_ready_subchannel_index_locked(
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
- "state=%d",
- (void *)p,
- (void *)p->subchannel_list->subchannels[index].subchannel,
- (void *)p->subchannel_list, (unsigned long)index,
- p->subchannel_list->subchannels[index].curr_connectivity_state);
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%s",
+ (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ grpc_connectivity_state_name(
+ p->subchannel_list->subchannels[index].curr_connectivity_state));
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
@@ -511,16 +529,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->subchannel_list->policy;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
+ "prev_state=%s new_state=%s p->shutdown=%d "
+ "sd->subchannel_list->shutting_down=%d error=%s",
+ (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list,
+ grpc_connectivity_state_name(sd->prev_connectivity_state),
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
+ p->shutdown, sd->subchannel_list->shutting_down,
+ grpc_error_string(error));
+ }
// If the policy is shutting down, unref and return.
if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return;
}
- if (sd->subchannel_list->shutting_down) {
+ if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription.
- GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
return;
@@ -536,13 +565,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] connectivity changed for subchannel %p: "
- "prev_state=%d new_state=%d",
- (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
- sd->curr_connectivity_state);
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;