From ecfe2d6ca605c71629ea2f01a9321e821de3a784 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 12 Jul 2017 18:10:56 -0700 Subject: RR: fix subchannel list handling --- .../lb_policy/round_robin/round_robin.c | 64 +++++++++++++++------- 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index fbef79ec31..dfd2b513b1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -170,9 +170,9 @@ static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, gpr_ref_non_zero(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count - 1), (unsigned long)count); + (unsigned long)(count - 1), (unsigned long)count, reason); } } @@ -182,9 +182,9 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, const bool done = gpr_unref(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count + 1), (unsigned long)count); + (unsigned long)(count + 1), (unsigned long)count, reason); } if (done) { rr_subchannel_list_destroy(exec_ctx, subchannel_list); @@ -196,16 +196,10 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, const char *reason) { - if (subchannel_list->shutting_down) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down", - (void *)subchannel_list); - } - return; - }; + GPR_ASSERT(!subchannel_list->shutting_down); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p", - (void *)subchannel_list); + gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p (%s)", + (void *)subchannel_list, reason); } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; @@ -317,9 +311,19 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); + const bool latest_is_current = + p->subchannel_list == p->latest_pending_subchannel_list; rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "sl_shutdown_rr_shutdown"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_unref_rr_shutdown"); + if (!latest_is_current && p->latest_pending_subchannel_list != NULL && + !p->latest_pending_subchannel_list->shutting_down) { + rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); + } p->subchannel_list = NULL; + p->latest_pending_subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -375,8 +379,8 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { subchannel_data *sd = &p->subchannel_list->subchannels[i]; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking_locked"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -559,7 +563,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { // sd belongs to an outdated subchannel_list: get rid of it. - rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sl_outdated+unref"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); return; } // Now that we're inside the combiner, copy the pending connectivity @@ -618,9 +625,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (p->subchannel_list != NULL) { // dispose of the current subchannel_list rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update_connectivity"); + "sl_phase_out_shutdown"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_phase_out_shutdown+unref"); } - p->subchannel_list = sd->subchannel_list; + rr_subchannel_list_ref(p->latest_pending_subchannel_list, + "sl_promotion"); + p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; } /* at this point we know there's at least one suitable subchannel. Go @@ -731,7 +742,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, "rr_update_empty"); if (p->subchannel_list != NULL) { rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update"); + "sl_shutdown_empty_update"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update+unref"); p->subchannel_list = NULL; } return; @@ -743,6 +756,17 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, gpr_zalloc(sizeof(subchannel_data) * num_addrs); subchannel_list->num_subchannels = num_addrs; gpr_ref_init(&subchannel_list->refcount, 1); + if (p->latest_pending_subchannel_list != NULL && p->started_picking) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "Shutting down latest pending subchannel list %p, about to be " + "replaced by newer latest %p", + (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); + } p->latest_pending_subchannel_list = subchannel_list; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", @@ -817,7 +841,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); } + rr_subchannel_list_ref(subchannel_list, "sl_initial_promotion"); p->subchannel_list = subchannel_list; + p->latest_pending_subchannel_list = NULL; } } -- cgit v1.2.3