diff options
author | David G. Quintas <dgq@google.com> | 2017-07-07 14:30:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-07 14:30:30 -0700 |
commit | 05f6d7e5ae12a0eacc0dbdedcd964b2c379a8390 (patch) | |
tree | 044abc00bdc76870f8db0b30f957f66493c63a95 /src/core | |
parent | 4e014a241011356c8c03c45c86b88bf5f012d8d1 (diff) | |
parent | 4b2def361aa48b5b9bb7a8bf85a8cb638c70f4e7 (diff) |
Merge pull request #11696 from dgquintas/rr_concurrent_updates_fix
Fix RR concurrent updates
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c | 54 |
1 files changed, 38 insertions, 16 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 8e9d6b0f47..56d340b8c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -195,11 +195,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, const char *reason) { + if (subchannel_list->shutting_down) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down", + (void *)subchannel_list); + } + return; + }; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p", + (void *)subchannel_list); + } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { subchannel_data *sd = &subchannel_list->subchannels[i]; if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "Unsubscribing from subchannel %p as part of shutting down " + "subchannel_list %p", + (void *)sd->subchannel, (void *)subchannel_list); + } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); @@ -228,13 +245,14 @@ static size_t get_next_ready_subchannel_index_locked( const size_t index = (i + p->last_ready_subchannel_index + 1) % p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%d", - (void *)p, - (void *)p->subchannel_list->subchannels[index].subchannel, - (void *)p->subchannel_list, (unsigned long)index, - p->subchannel_list->subchannels[index].curr_connectivity_state); + gpr_log( + GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%s", + (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + grpc_connectivity_state_name( + p->subchannel_list->subchannels[index].curr_connectivity_state)); } if (p->subchannel_list->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { @@ -511,16 +529,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->subchannel_list->policy; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " + "prev_state=%s new_state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->prev_connectivity_state), + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); + } // If the policy is shutting down, unref and return. if (p->shutdown) { rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); return; } - if (sd->subchannel_list->shutting_down) { + if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // the subchannel list associated with sd has been discarded. This callback // corresponds to the unsubscription. - GPR_ASSERT(error == GRPC_ERROR_CANCELLED); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); return; @@ -536,13 +565,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p: " - "prev_state=%d new_state=%d", - (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, - sd->curr_connectivity_state); - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; |