From ecfe2d6ca605c71629ea2f01a9321e821de3a784 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 12 Jul 2017 18:10:56 -0700 Subject: RR: fix subchannel list handling --- .../lb_policy/round_robin/round_robin.c | 64 +++++++++++++++------- 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index fbef79ec31..dfd2b513b1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -170,9 +170,9 @@ static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, gpr_ref_non_zero(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count - 1), (unsigned long)count); + (unsigned long)(count - 1), (unsigned long)count, reason); } } @@ -182,9 +182,9 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, const bool done = gpr_unref(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count + 1), (unsigned long)count); + (unsigned long)(count + 1), (unsigned long)count, reason); } if (done) { rr_subchannel_list_destroy(exec_ctx, subchannel_list); @@ -196,16 +196,10 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, const char *reason) { - if (subchannel_list->shutting_down) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down", - (void *)subchannel_list); - } - return; - }; + GPR_ASSERT(!subchannel_list->shutting_down); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p", - (void *)subchannel_list); + gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p (%s)", + (void *)subchannel_list, reason); } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; @@ -317,9 +311,19 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); + const bool latest_is_current = + p->subchannel_list == p->latest_pending_subchannel_list; rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "sl_shutdown_rr_shutdown"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_unref_rr_shutdown"); + if (!latest_is_current && p->latest_pending_subchannel_list != NULL && + !p->latest_pending_subchannel_list->shutting_down) { + rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); + } p->subchannel_list = NULL; + p->latest_pending_subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -375,8 +379,8 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { subchannel_data *sd = &p->subchannel_list->subchannels[i]; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking_locked"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -559,7 +563,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { // sd belongs to an outdated subchannel_list: get rid of it. - rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sl_outdated+unref"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); return; } // Now that we're inside the combiner, copy the pending connectivity @@ -618,9 +625,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (p->subchannel_list != NULL) { // dispose of the current subchannel_list rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update_connectivity"); + "sl_phase_out_shutdown"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_phase_out_shutdown+unref"); } - p->subchannel_list = sd->subchannel_list; + rr_subchannel_list_ref(p->latest_pending_subchannel_list, + "sl_promotion"); + p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; } /* at this point we know there's at least one suitable subchannel. Go @@ -731,7 +742,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, "rr_update_empty"); if (p->subchannel_list != NULL) { rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update"); + "sl_shutdown_empty_update"); + rr_subchannel_list_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update+unref"); p->subchannel_list = NULL; } return; @@ -743,6 +756,17 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, gpr_zalloc(sizeof(subchannel_data) * num_addrs); subchannel_list->num_subchannels = num_addrs; gpr_ref_init(&subchannel_list->refcount, 1); + if (p->latest_pending_subchannel_list != NULL && p->started_picking) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "Shutting down latest pending subchannel list %p, about to be " + "replaced by newer latest %p", + (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); + } p->latest_pending_subchannel_list = subchannel_list; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", @@ -817,7 +841,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); } + rr_subchannel_list_ref(subchannel_list, "sl_initial_promotion"); p->subchannel_list = subchannel_list; + p->latest_pending_subchannel_list = NULL; } } -- cgit v1.2.3 From 47e12924c5804e389ac9b5abed9eb27f74bb622e Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Jul 2017 13:25:17 -0700 Subject: PR comments --- .../lb_policy/round_robin/round_robin.c | 77 ++++++++++++++-------- 1 file changed, 48 insertions(+), 29 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index dfd2b513b1..3e34257ea7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -191,6 +191,21 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, } } +static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, + size_t num_subchannels) { + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_subchannels); + subchannel_list->num_subchannels = num_subchannels; + gpr_ref_init(&subchannel_list->refcount, 1); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", + (void *)subchannel_list, (unsigned long)num_subchannels); + } + return subchannel_list; +} + /** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The * watcher's callback will ultimately unref \a subchannel_list. */ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, @@ -217,7 +232,8 @@ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, &sd->connectivity_changed_closure); } } - rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); + // Corresponds to the creation ref. + rr_subchannel_list_unref(exec_ctx, subchannel_list, "creation"); } /** Returns the index into p->subchannel_list->subchannels of the next @@ -316,14 +332,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "sl_shutdown_rr_shutdown"); rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_unref_rr_shutdown"); + "sl_shutdown_current+make_pending"); if (!latest_is_current && p->latest_pending_subchannel_list != NULL && !p->latest_pending_subchannel_list->shutting_down) { rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); + rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list, + "sl_shutdown_pending+make_pending"); + p->latest_pending_subchannel_list = NULL; } p->subchannel_list = NULL; - p->latest_pending_subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -380,7 +398,7 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { subchannel_data *sd = &p->subchannel_list->subchannels[i]; GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); - rr_subchannel_list_ref(sd->subchannel_list, "start_picking_locked"); + rr_subchannel_list_ref(sd->subchannel_list, "started_picking"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -548,15 +566,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } // If the policy is shutting down, unref and return. if (p->shutdown) { - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "pol_shutdown+started_picking"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); return; } if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // the subchannel list associated with sd has been discarded. This callback - // corresponds to the unsubscription. - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + // corresponds to the unsubscription. The unrefs correspond to the picking + // ref (start_picking_locked or update_started_picking). + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sl_shutdown+started_picking"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking"); return; } // Dispose of outdated subchannel lists. @@ -565,7 +586,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // sd belongs to an outdated subchannel_list: get rid of it. rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated"); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sl_outdated+unref"); + "sl_outdated+started_picking"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); return; } @@ -589,7 +610,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ + // the policy is shutting down. Flush all the pending picks... pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -598,8 +619,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* unref the "rr_connectivity" weak ref from start_picking */ - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sd_shutdown+started_picking"); + // unref the "rr_connectivity_update" weak ref from start_picking. GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity_sd_shutdown"); } else { // sd not in SHUTDOWN @@ -627,10 +649,11 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "sl_phase_out_shutdown"); rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_phase_out_shutdown+unref"); + "sl_phase_out_shutdown+started_picking"); } - rr_subchannel_list_ref(p->latest_pending_subchannel_list, - "sl_promotion"); + // Promote pending list: No need to take a ref on + // p->latest_pending_subchannel_list: reusing its "make_pending" + // one. p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; } @@ -665,8 +688,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref on the policy - * as well as the sd->subchannel_list ref. */ + /* renew notification: reuses the "rr_connectivity_update" weak ref on the + * policy as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -744,18 +767,13 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "sl_shutdown_empty_update"); rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_empty_update+unref"); + "sl_shutdown_empty_update+make_pending"); p->subchannel_list = NULL; } return; } size_t subchannel_index = 0; - rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); - subchannel_list->policy = p; - subchannel_list->subchannels = - gpr_zalloc(sizeof(subchannel_data) * num_addrs); - subchannel_list->num_subchannels = num_addrs; - gpr_ref_init(&subchannel_list->refcount, 1); + rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); if (p->latest_pending_subchannel_list != NULL && p->started_picking) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, @@ -766,12 +784,11 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); + rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash+make_pending"); } p->latest_pending_subchannel_list = subchannel_list; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", - (void *)subchannel_list, (unsigned long)num_addrs); - } + rr_subchannel_list_ref(p->latest_pending_subchannel_list, "make_pending"); grpc_subchannel_args sc_args; /* We need to remove the LB addresses in order to be able to compare the * subchannel keys of subchannels from a different batch of addresses. */ @@ -840,8 +857,10 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (p->subchannel_list != NULL) { rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); + rr_subchannel_list_unref(exec_ctx, subchannel_list, + "rr_update_before_started_picking+make_pending"); } - rr_subchannel_list_ref(subchannel_list, "sl_initial_promotion"); + // Recycles "make_pending" reference. p->subchannel_list = subchannel_list; p->latest_pending_subchannel_list = NULL; } -- cgit v1.2.3 From 507d1fd58ec0ad6808d98089ecf257815e601261 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Jul 2017 15:31:51 -0700 Subject: Removed duplicated refs --- .../lb_policy/round_robin/round_robin.c | 146 ++++++++++----------- 1 file changed, 67 insertions(+), 79 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 3e34257ea7..def8d5d68a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -141,6 +141,21 @@ struct rr_subchannel_list { bool shutting_down; }; +static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, + size_t num_subchannels) { + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_subchannels); + subchannel_list->num_subchannels = num_subchannels; + gpr_ref_init(&subchannel_list->refcount, 1); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels", + (void *)p, (void *)subchannel_list, (unsigned long)num_subchannels); + } + return subchannel_list; +} + static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list) { GPR_ASSERT(subchannel_list->shutting_down); @@ -191,30 +206,15 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, } } -static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, - size_t num_subchannels) { - rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); - subchannel_list->policy = p; - subchannel_list->subchannels = - gpr_zalloc(sizeof(subchannel_data) * num_subchannels); - subchannel_list->num_subchannels = num_subchannels; - gpr_ref_init(&subchannel_list->refcount, 1); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", - (void *)subchannel_list, (unsigned long)num_subchannels); - } - return subchannel_list; -} - /** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The * watcher's callback will ultimately unref \a subchannel_list. */ -static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list, - const char *reason) { +static void rr_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, + const char *reason) { GPR_ASSERT(!subchannel_list->shutting_down); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p (%s)", - (void *)subchannel_list, reason); + gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)", + (void *)subchannel_list->policy, (void *)subchannel_list, reason); } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; @@ -222,18 +222,19 @@ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, subchannel_data *sd = &subchannel_list->subchannels[i]; if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "Unsubscribing from subchannel %p as part of shutting down " - "subchannel_list %p", - (void *)sd->subchannel, (void *)subchannel_list); + gpr_log( + GPR_DEBUG, + "[RR %p] Unsubscribing from subchannel %p as part of shutting down " + "subchannel_list %p", + (void *)subchannel_list->policy, (void *)sd->subchannel, + (void *)subchannel_list); } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } } - // Corresponds to the creation ref. - rr_subchannel_list_unref(exec_ctx, subchannel_list, "creation"); + rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); } /** Returns the index into p->subchannel_list->subchannels of the next @@ -303,7 +304,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); + gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", + (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p); @@ -312,7 +314,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); + gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", + (void *)pol, (void *)pol); } p->shutdown = true; pending_pick *pp; @@ -329,19 +332,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); const bool latest_is_current = p->subchannel_list == p->latest_pending_subchannel_list; - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_shutdown"); - rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_current+make_pending"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; if (!latest_is_current && p->latest_pending_subchannel_list != NULL && !p->latest_pending_subchannel_list->shutting_down) { - rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, - "sl_shutdown_pending_rr_shutdown"); - rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list, - "sl_shutdown_pending+make_pending"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, + p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = NULL; } - p->subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -420,7 +420,7 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); + gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } if (p->subchannel_list != NULL) { const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); @@ -436,8 +436,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log( GPR_DEBUG, - "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " - "INDEX %lu)", + "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " + "index %lu)", (void *)p, (void *)sd->subchannel, (void *)*target, (void *)sd->subchannel_list, (unsigned long)next_ready_index); } @@ -584,9 +584,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { // sd belongs to an outdated subchannel_list: get rid of it. - rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated"); - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sl_outdated+started_picking"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, + "sl_outdated"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); return; } @@ -646,14 +645,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (p->subchannel_list != NULL) { // dispose of the current subchannel_list - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_phase_out_shutdown"); - rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_phase_out_shutdown+started_picking"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_phase_out_shutdown"); } - // Promote pending list: No need to take a ref on - // p->latest_pending_subchannel_list: reusing its "make_pending" - // one. p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; } @@ -665,8 +659,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ + // if the selected subchannel is going to be used for the pending + // picks, update the last picked pointer update_last_ready_subchannel_index_locked(p, next_ready_index); } pending_pick *pp; @@ -680,9 +674,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)", - (void *)selected->subchannel, - (unsigned long)next_ready_index); + "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " + "(subchannel_list %p, index %lu)", + (void *)p, (void *)selected->subchannel, + (void *)p->subchannel_list, (unsigned long)next_ready_index); } GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); @@ -747,8 +742,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } else { // otherwise, keep using the current subchannel list (ignore this update). gpr_log(GPR_ERROR, - "No valid LB addresses channel arg for Round Robin %p update, " - "ignoring.", + "[RR %p] No valid LB addresses channel arg for update, ignoring.", (void *)p); } return; @@ -764,10 +758,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_empty_update"); - rr_subchannel_list_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_empty_update+make_pending"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); p->subchannel_list = NULL; } return; @@ -777,18 +769,16 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (p->latest_pending_subchannel_list != NULL && p->started_picking) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "Shutting down latest pending subchannel list %p, about to be " + "[RR %p] Shutting down latest pending subchannel list %p, about " + "to be " "replaced by newer latest %p", - (void *)p->latest_pending_subchannel_list, + (void *)p, (void *)p->latest_pending_subchannel_list, (void *)subchannel_list); } - rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, - "sl_outdated_dont_smash"); - rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list, - "sl_outdated_dont_smash+make_pending"); + rr_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); } p->latest_pending_subchannel_list = subchannel_list; - rr_subchannel_list_ref(p->latest_pending_subchannel_list, "make_pending"); grpc_subchannel_args sc_args; /* We need to remove the LB addresses in order to be able to compare the * subchannel keys of subchannels from a different batch of addresses. */ @@ -812,11 +802,12 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "index %lu: Created subchannel %p for address uri %s into " - "subchannel_list %p", - (unsigned long)subchannel_index, (void *)subchannel, address_uri, - (void *)subchannel_list); + gpr_log( + GPR_DEBUG, + "[RR %p] index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (void *)p, (unsigned long)subchannel_index, (void *)subchannel, + address_uri, (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); @@ -855,12 +846,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "rr_update_before_started_picking"); - rr_subchannel_list_unref(exec_ctx, subchannel_list, - "rr_update_before_started_picking+make_pending"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); } - // Recycles "make_pending" reference. p->subchannel_list = subchannel_list; p->latest_pending_subchannel_list = NULL; } @@ -892,7 +880,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void *)p, (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; -- cgit v1.2.3 From 9a4ed686b1a0a0f3b655a34e61151b1dcb0db744 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Jul 2017 16:12:30 -0700 Subject: Improved testing --- src/core/ext/filters/client_channel/subchannel_index.c | 7 +++++++ src/core/ext/filters/client_channel/subchannel_index.h | 12 ++++++++++++ test/cpp/end2end/client_lb_end2end_test.cc | 13 +++++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index e291ca9db9..a33ab950bf 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -40,6 +40,8 @@ struct grpc_subchannel_key { GPR_TLS_DECL(subchannel_index_exec_ctx); +static bool g_force_creation = false; + static void enter_ctx(grpc_exec_ctx *exec_ctx) { GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0); gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx); @@ -84,6 +86,7 @@ static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { int grpc_subchannel_key_compare(const grpc_subchannel_key *a, const grpc_subchannel_key *b) { + if (g_force_creation) return false; int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { @@ -250,3 +253,7 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, leave_ctx(exec_ctx); } + +void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { + g_force_creation = force_creation; +} diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index e303bfaa05..98d882a453 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,4 +59,16 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** \em TEST ONLY. + * If \a force_creation is true, all key comparisons will be false, resulting in + * new subchannels always being created. Otherwise, the keys will be compared as + * usual. + * + * This function is *not* threadsafe on purpose: it should *only* be used in + * test code. + * + * Tests using this function \em MUST run tests with and without \a + * force_creation set. */ +void grpc_subchannel_index_test_only_set_force_creation(bool force_creation); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */ diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 6d3f5a9d46..e1160ecdc6 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -35,6 +35,7 @@ extern "C" { #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" } #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -331,10 +332,14 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { for (size_t i = 0; i < servers_.size(); ++i) { ports.emplace_back(servers_[i]->port_); } - for (size_t i = 0; i < 1000; ++i) { - std::random_shuffle(ports.begin(), ports.end()); - SetNextResolution(ports); - if (i % 10 == 0) SendRpc(); + for (const bool force_creation : {true, false}) { + grpc_subchannel_index_test_only_set_force_creation(force_creation); + gpr_log(GPR_INFO, "Force subchannel creation: %d", force_creation); + for (size_t i = 0; i < 1000; ++i) { + std::random_shuffle(ports.begin(), ports.end()); + SetNextResolution(ports); + if (i % 10 == 0) SendRpc(); + } } // Check LB policy name for the channel. EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); -- cgit v1.2.3