From 87c6504ca975abcee0f86fe670b843c8a08c50fb Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Thu, 15 Feb 2018 09:42:45 -0800 Subject: Revert "Revert "Revert "Revert "grpclb re-resolution"""" --- include/grpc/impl/codegen/grpc_types.h | 2 +- src/core/ext/filters/client_channel/lb_policy.cc | 7 +- src/core/ext/filters/client_channel/lb_policy.h | 4 - .../client_channel/lb_policy/grpclb/grpclb.cc | 133 ++++++++++--------- .../lb_policy/pick_first/pick_first.cc | 11 +- .../lb_policy/round_robin/round_robin.cc | 11 +- test/cpp/end2end/grpclb_end2end_test.cc | 143 +++++++++++++++++++-- 7 files changed, 208 insertions(+), 103 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index a372dbba73..3e897b0b27 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -301,7 +301,7 @@ typedef struct { #define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" /* Timeout in milliseconds to wait for the serverlist from the grpclb load balancer before using fallback backend addresses from the resolver. - If 0, fallback will never be used. */ + If 0, fallback will never be used. Default value is 10000. */ #define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index cc4fe7ec62..27fb2ad1f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -118,7 +118,8 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy, void grpc_lb_policy_set_reresolve_closure_locked( grpc_lb_policy* policy, grpc_closure* request_reresolution) { - policy->vtable->set_reresolve_closure_locked(policy, request_reresolution); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; } void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, @@ -133,8 +134,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, grpc_lb_trace->name(), policy, grpc_error_string(error)); } } else { - if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) { - gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.", + if (grpc_lb_trace->enabled()) { + gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.", grpc_lb_trace->name(), policy); } } diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 30660cb83d..6edd314d5e 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -107,10 +107,6 @@ struct grpc_lb_policy_vtable { void (*update_locked)(grpc_lb_policy* policy, const grpc_lb_policy_args* args); - - /** \see grpc_lb_policy_set_reresolve_closure */ - void (*set_reresolve_closure_locked)(grpc_lb_policy* policy, - grpc_closure* request_reresolution); }; #ifndef NDEBUG diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 21c95460c8..da82b3f4da 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -249,7 +249,7 @@ typedef struct glb_lb_policy { /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy* rr_policy; - grpc_closure on_rr_connectivity_changed; + /** the connectivity state of the embedded RR policy */ grpc_connectivity_state rr_connectivity_state; bool started_picking; @@ -292,6 +292,12 @@ typedef struct glb_lb_policy { /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; + /** called upon changes to the RR's connectivity. */ + grpc_closure rr_on_connectivity_changed; + + /** called upon reresolution request from the RR policy. */ + grpc_closure rr_on_reresolution_requested; + /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ @@ -590,9 +596,8 @@ static grpc_lb_addresses* extract_backend_addresses_locked( return backend_addresses; } -static void update_lb_connectivity_status_locked( - glb_lb_policy* glb_policy, grpc_connectivity_state rr_state, - grpc_error* rr_state_error) { +static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy, + grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new @@ -624,7 +629,7 @@ static void update_lb_connectivity_status_locked( * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (rr_state) { + switch (glb_policy->rr_connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); @@ -638,11 +643,12 @@ static void update_lb_connectivity_status_locked( gpr_log( GPR_INFO, "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", - glb_policy, grpc_connectivity_state_name(rr_state), + glb_policy, + grpc_connectivity_state_name(glb_policy->rr_connectivity_state), glb_policy->rr_policy); } - grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state, - rr_state_error, + grpc_connectivity_state_set(&glb_policy->state_tracker, + glb_policy->rr_connectivity_state, rr_state_error, "update_lb_connectivity_status_locked"); } @@ -740,11 +746,36 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) { gpr_free(args); } -static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error); +static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; + if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) { + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "rr_on_reresolution_requested_locked"); + return; + } + if (grpc_lb_glb_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", + glb_policy, glb_policy->rr_policy); + } + // If we are talking to a balancer, we expect to get updated addresses form + // the balancer, so we can ignore the re-resolution request from the RR + // policy. Otherwise, handle the re-resolution request using glb's original + // re-resolution closure. + if (glb_policy->lb_calld == nullptr || + !glb_policy->lb_calld->seen_initial_response) { + grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, + GRPC_ERROR_NONE); + } + // Give back the wrapper closure to the RR policy. + grpc_lb_policy_set_reresolve_closure_locked( + glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested); +} + static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); - grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args); if (new_rr_policy == nullptr) { gpr_log(GPR_ERROR, @@ -757,29 +788,25 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->rr_policy); return; } + GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked"); grpc_lb_policy_set_reresolve_closure_locked( - new_rr_policy, glb_policy->base.request_reresolution); - glb_policy->base.request_reresolution = nullptr; + new_rr_policy, &glb_policy->rr_on_reresolution_requested); glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked( - glb_policy, glb_policy->rr_connectivity_state, rr_state_error); + update_lb_connectivity_status_locked(glb_policy, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed, - on_rr_connectivity_changed_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked"); grpc_lb_policy_notify_on_state_change_locked( glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->on_rr_connectivity_changed); + &glb_policy->rr_on_connectivity_changed); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); // Send pending picks to RR policy. pending_pick* pp; @@ -827,28 +854,18 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) { lb_policy_args_destroy(args); } -static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = static_cast(arg); +static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); - return; - } - if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - /* An RR policy that has transitioned into the SHUTDOWN connectivity state - * should not be considered for picks or updates: the SHUTDOWN state is a - * sink, policies can't transition back from it. .*/ - GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); - glb_policy->rr_policy = nullptr; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "rr_on_connectivity_changed_locked"); return; } - /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked( - glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */ + update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error)); + // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref. grpc_lb_policy_notify_on_state_change_locked( glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->on_rr_connectivity_changed); + &glb_policy->rr_on_connectivity_changed); } static void destroy_balancer_name(void* balancer_name) { @@ -971,8 +988,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, if (glb_policy->rr_policy != nullptr) { grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); - } else { - grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -984,6 +999,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "glb_shutdown"); + grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); // Clear pending picks. pending_pick* pp = glb_policy->pending_picks; glb_policy->pending_picks = nullptr; @@ -1623,6 +1639,8 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { lb_calld, lb_calld->lb_call, grpc_error_string(error)); gpr_free(status_details); } + grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, + GRPC_ERROR_NONE); // If this lb_calld is still in use, this call ended because of a failure so // we want to retry connecting. Otherwise, we have deliberately ended this // call and no further action is required. @@ -1651,16 +1669,15 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { glb_policy->fallback_timer_callback_pending = false; /* If we receive a serverlist after the timer fires but before this callback * actually runs, don't fall back. */ - if (glb_policy->serverlist == nullptr) { - if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Falling back to use backends from resolver", - glb_policy); - } - GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - rr_handover_locked(glb_policy); + if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down && + error == GRPC_ERROR_NONE) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Falling back to use backends from resolver", + glb_policy); } + GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); + rr_handover_locked(glb_policy); } GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } @@ -1781,19 +1798,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, } } -static void glb_set_reresolve_closure_locked( - grpc_lb_policy* policy, grpc_closure* request_reresolution) { - glb_lb_policy* glb_policy = reinterpret_cast(policy); - GPR_ASSERT(!glb_policy->shutting_down); - GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); - if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy, - request_reresolution); - } else { - glb_policy->base.request_reresolution = request_reresolution; - } -} - /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1805,8 +1809,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_exit_idle_locked, glb_check_connectivity_locked, glb_notify_on_state_change_locked, - glb_update_locked, - glb_set_reresolve_closure_locked}; + glb_update_locked}; static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { @@ -1887,6 +1890,12 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, return nullptr; } grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed, + rr_on_connectivity_changed_locked, glb_policy, + grpc_combiner_scheduler(args->combiner)); + GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested, + rr_on_reresolution_requested_locked, glb_policy, + grpc_combiner_scheduler(args->combiner)); GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 1485f7caf5..296bdcb247 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -520,14 +520,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { } } -static void pf_set_reresolve_closure_locked( - grpc_lb_policy* policy, grpc_closure* request_reresolution) { - pick_first_lb_policy* p = reinterpret_cast(policy); - GPR_ASSERT(!p->shutdown); - GPR_ASSERT(policy->request_reresolution == nullptr); - policy->request_reresolution = request_reresolution; -} - static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -538,8 +530,7 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_exit_idle_locked, pf_check_connectivity_locked, pf_notify_on_state_change_locked, - pf_update_locked, - pf_set_reresolve_closure_locked}; + pf_update_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index cefd0d8d7d..b5b4c44ef1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -620,14 +620,6 @@ static void rr_update_locked(grpc_lb_policy* policy, } } -static void rr_set_reresolve_closure_locked( - grpc_lb_policy* policy, grpc_closure* request_reresolution) { - round_robin_lb_policy* p = reinterpret_cast(policy); - GPR_ASSERT(!p->shutdown); - GPR_ASSERT(policy->request_reresolution == nullptr); - policy->request_reresolution = request_reresolution; -} - static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown_locked, @@ -638,8 +630,7 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_exit_idle_locked, rr_check_connectivity_locked, rr_notify_on_state_change_locked, - rr_update_locked, - rr_set_reresolve_closure_locked}; + rr_update_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index da5d498963..cc65fe4fdb 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -192,6 +192,7 @@ class BalancerServiceImpl : public BalancerService { gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this, request.DebugString().c_str()); + // TODO(juanlishen): Initial response should always be the first response. if (client_load_reporting_interval_seconds_ > 0) { LoadBalanceResponse initial_response; initial_response.mutable_initial_response() @@ -444,7 +445,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void WaitForBackend(size_t backend_idx) { do { - CheckRpcSendOk(); + SendRpc(); } while (backends_[backend_idx]->request_count() == 0); ResetBackendCounters(); } @@ -663,9 +664,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent two responses. EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } TEST_F(SingleBalancerTest, Fallback) { @@ -874,8 +872,6 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. CheckRpcSendFailure(); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } class UpdatesTest : public GrpclbEnd2endTest { @@ -939,8 +935,6 @@ TEST_F(UpdatesTest, UpdateBalancers) { EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } // Send an update with the same set of LBs as the one in SetUp() in order to @@ -1012,9 +1006,6 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // doesn't assign the second backend. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); balancers_[0]->NotifyDoneWithServerlists(); - - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { @@ -1097,8 +1088,134 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(UpdatesTest, ReresolveDeadBalancer) { + std::vector addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + SetNextResolution(addresses); + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + SetNextReresolutionResponse(addresses); + const std::vector first_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill backend 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); + if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); + + CheckRpcSendFailure(); + + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + // Kill balancer 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); + balancers_[0]->NotifyDoneWithServerlists(); + if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // Balancer 1 may have received a request if re-resolution is done quickly + // enough. + EXPECT_GE(balancer_servers_[1].service_->request_count(), 0U); + EXPECT_GE(balancer_servers_[1].service_->response_count(), 0U); + EXPECT_LE(balancer_servers_[1].service_->request_count(), 1U); + EXPECT_LE(balancer_servers_[1].service_->response_count(), 1U); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + // Wait until re-resolution has finished, as signaled by the second backend + // receiving a request. + WaitForBackend(1); + + // This is serviced by the new serverlist. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); +} + +TEST_F(UpdatesTest, ReresolveDeadBackend) { + ResetStub(500); + // The first resolution contains the addresses of a balancer that never + // responds, and a fallback backend. + std::vector addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""}); + SetNextResolution(addresses); + // The re-resolution result will contain the addresses of the same balancer + // and a new fallback backend. + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{backend_servers_[1].port_, false, ""}); + SetNextReresolutionResponse(addresses); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // All 10 requests should have gone to the fallback backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill backend 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); + if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); + + // Wait until re-resolution has finished, as signaled by the second backend + // receiving a request. + WaitForBackend(1); + + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); } TEST_F(SingleBalancerTest, Drop) { -- cgit v1.2.3