aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c64
1 files changed, 45 insertions, 19 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index fbef79ec31..dfd2b513b1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -170,9 +170,9 @@ static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
gpr_ref_non_zero(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
- (unsigned long)(count - 1), (unsigned long)count);
+ (unsigned long)(count - 1), (unsigned long)count, reason);
}
}
@@ -182,9 +182,9 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
const bool done = gpr_unref(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
- (unsigned long)(count + 1), (unsigned long)count);
+ (unsigned long)(count + 1), (unsigned long)count, reason);
}
if (done) {
rr_subchannel_list_destroy(exec_ctx, subchannel_list);
@@ -196,16 +196,10 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
- if (subchannel_list->shutting_down) {
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
- (void *)subchannel_list);
- }
- return;
- };
+ GPR_ASSERT(!subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
- (void *)subchannel_list);
+ gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p (%s)",
+ (void *)subchannel_list, reason);
}
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
@@ -317,9 +311,19 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
+ const bool latest_is_current =
+ p->subchannel_list == p->latest_pending_subchannel_list;
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_shutdown");
+ rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
+ "sl_unref_rr_shutdown");
+ if (!latest_is_current && p->latest_pending_subchannel_list != NULL &&
+ !p->latest_pending_subchannel_list->shutting_down) {
+ rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list,
+ "sl_shutdown_pending_rr_shutdown");
+ }
p->subchannel_list = NULL;
+ p->latest_pending_subchannel_list = NULL;
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -375,8 +379,8 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &p->subchannel_list->subchannels[i];
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked");
+ rr_subchannel_list_ref(sd->subchannel_list, "start_picking_locked");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@@ -559,7 +563,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (sd->subchannel_list != p->subchannel_list &&
sd->subchannel_list != p->latest_pending_subchannel_list) {
// sd belongs to an outdated subchannel_list: get rid of it.
- rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
+ rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated");
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
+ "sl_outdated+unref");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated");
return;
}
// Now that we're inside the combiner, copy the pending connectivity
@@ -618,9 +625,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
- "sl_shutdown_rr_update_connectivity");
+ "sl_phase_out_shutdown");
+ rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
+ "sl_phase_out_shutdown+unref");
}
- p->subchannel_list = sd->subchannel_list;
+ rr_subchannel_list_ref(p->latest_pending_subchannel_list,
+ "sl_promotion");
+ p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
/* at this point we know there's at least one suitable subchannel. Go
@@ -731,7 +742,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
"rr_update_empty");
if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
- "sl_shutdown_rr_update");
+ "sl_shutdown_empty_update");
+ rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
+ "sl_shutdown_empty_update+unref");
p->subchannel_list = NULL;
}
return;
@@ -743,6 +756,17 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
gpr_zalloc(sizeof(subchannel_data) * num_addrs);
subchannel_list->num_subchannels = num_addrs;
gpr_ref_init(&subchannel_list->refcount, 1);
+ if (p->latest_pending_subchannel_list != NULL && p->started_picking) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Shutting down latest pending subchannel list %p, about to be "
+ "replaced by newer latest %p",
+ (void *)p->latest_pending_subchannel_list,
+ (void *)subchannel_list);
+ }
+ rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list,
+ "sl_outdated_dont_smash");
+ }
p->latest_pending_subchannel_list = subchannel_list;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
@@ -817,7 +841,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"rr_update_before_started_picking");
}
+ rr_subchannel_list_ref(subchannel_list, "sl_initial_promotion");
p->subchannel_list = subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
}
}