diff options
author | David Garcia Quintas <dgq@google.com> | 2018-02-07 17:07:16 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2018-02-07 17:07:16 -0800 |
commit | a68a11e4a0efee22f44dae5c4d3ff87715e37fc4 (patch) | |
tree | 2bf67cd9aa440fe5bda63c20b12489fb4c090466 | |
parent | 20331706e27abb2554e70df1ce9e58e049808f12 (diff) |
PR comments
3 files changed, 47 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index ecee6150bc..ab6d3e6a03 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -329,8 +329,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE (and - * requests re-resolution). + * TRANSIENT_FAILURE. * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ @@ -397,9 +396,6 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and new overall state. - update_state_counters_locked(sd); - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. switch (sd->curr_connectivity_state) { @@ -482,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } + // Update state counters and new overall state. + update_state_counters_locked(sd); + // Only update connectivity based on the selected subchannel list. + if (sd->subchannel_list == p->subchannel_list) { + update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } @@ -564,9 +566,20 @@ static void rr_update_locked(grpc_lb_policy* policy, // purposes if the subchannel is already in transient failure. Otherwise // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE // discrepancy, attempt to re-resolve and end up here again. + // TODO(roth): As part of C++-ifying the subchannel_list API, design a + // better API for notifying the LB policy of subchannel states, which can + // be used both for the subchannel's initial state and for subsequent + // state changes. This will allow us to handle this more generally instead + // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any + // pending picks across all READY subchannels rather than sending them all + // to the first one). if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { subchannel_list->subchannels[i].pending_connectivity_state_unsafe = - subchannel_state; + subchannel_list->subchannels[i].curr_connectivity_state = + subchannel_list->subchannels[i].prev_connectivity_state = + subchannel_state; + --subchannel_list->num_idle; + ++subchannel_list->num_transient_failures; } } if (p->latest_pending_subchannel_list != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 3377605263..91537f3afe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list { size_t num_ready; /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are in state SHUTDOWN */ - size_t num_shutdown; /** how many subchannels are in state IDLE */ size_t num_idle; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 2bb3c8d14d..ee5adbc6fa 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -138,7 +138,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } } - void SetNextResolution(const std::vector<int>& ports, bool notify = true) { + void SetNextResolution(const std::vector<int>& ports) { grpc_core::ExecCtx exec_ctx; grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), nullptr); @@ -157,13 +157,33 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args* fake_result = grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); - if (notify) { - grpc_fake_resolver_response_generator_set_response(response_generator_, - fake_result); - } else { - grpc_fake_resolver_response_generator_set_response_upon_error( - response_generator_, fake_result); + grpc_fake_resolver_response_generator_set_response(response_generator_, + fake_result); + grpc_channel_args_destroy(fake_result); + grpc_lb_addresses_destroy(addresses); + } + + void SetNextResolutionUponError(const std::vector<int>& ports) { + grpc_core::ExecCtx exec_ctx; + grpc_lb_addresses* addresses = + grpc_lb_addresses_create(ports.size(), nullptr); + for (size_t i = 0; i < ports.size(); ++i) { + char* lb_uri_str; + gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]); + grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + GPR_ASSERT(lb_uri != nullptr); + grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri, + false /* is balancer */, + "" /* balancer name */, nullptr); + grpc_uri_destroy(lb_uri); + gpr_free(lb_uri_str); } + const grpc_arg fake_addresses = + grpc_lb_addresses_create_channel_arg(addresses); + grpc_channel_args* fake_result = + grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); + grpc_fake_resolver_response_generator_set_response_upon_error( + response_generator_, fake_result); grpc_channel_args_destroy(fake_result); grpc_lb_addresses_destroy(addresses); } @@ -578,9 +598,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { ports.emplace_back(servers_[0]->port_); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); - gpr_log(GPR_INFO, "ABOUT TO SEND ALLLLL"); SetNextResolution(ports); - gpr_log(GPR_INFO, "SENT ALLLLLLLLLLLLLLLLLL"); WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 1, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION); @@ -708,7 +726,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { StartServers(kNumServers, second_ports); // Don't notify of the update. Wait for the LB policy's re-resolution to // "pull" the new ports. - SetNextResolution(second_ports, false); + SetNextResolutionUponError(second_ports); gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******"); gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******"); // Client request should eventually (but still fairly soon) succeed. |