aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2018-02-07 17:07:16 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2018-02-07 17:07:16 -0800
commita68a11e4a0efee22f44dae5c4d3ff87715e37fc4 (patch)
tree2bf67cd9aa440fe5bda63c20b12489fb4c090466
parent20331706e27abb2554e70df1ce9e58e049808f12 (diff)
PR comments
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc25
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc38
3 files changed, 47 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index ecee6150bc..ab6d3e6a03 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -329,8 +329,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
- * TRANSIENT_FAILURE (and
- * requests re-resolution).
+ * TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
@@ -397,9 +396,6 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // Update state counters and new overall state.
- update_state_counters_locked(sd);
- update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
switch (sd->curr_connectivity_state) {
@@ -482,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Update state counters and new overall state.
+ update_state_counters_locked(sd);
+ // Only update connectivity based on the selected subchannel list.
+ if (sd->subchannel_list == p->subchannel_list) {
+ update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
+ }
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
@@ -564,9 +566,20 @@ static void rr_update_locked(grpc_lb_policy* policy,
// purposes if the subchannel is already in transient failure. Otherwise
// we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
// discrepancy, attempt to re-resolve and end up here again.
+ // TODO(roth): As part of C++-ifying the subchannel_list API, design a
+ // better API for notifying the LB policy of subchannel states, which can
+ // be used both for the subchannel's initial state and for subsequent
+ // state changes. This will allow us to handle this more generally instead
+ // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
+ // pending picks across all READY subchannels rather than sending them all
+ // to the first one).
if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
- subchannel_state;
+ subchannel_list->subchannels[i].curr_connectivity_state =
+ subchannel_list->subchannels[i].prev_connectivity_state =
+ subchannel_state;
+ --subchannel_list->num_idle;
+ ++subchannel_list->num_transient_failures;
}
}
if (p->latest_pending_subchannel_list != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 3377605263..91537f3afe 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are in state SHUTDOWN */
- size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index 2bb3c8d14d..ee5adbc6fa 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -138,7 +138,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
- void SetNextResolution(const std::vector<int>& ports, bool notify = true) {
+ void SetNextResolution(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(ports.size(), nullptr);
@@ -157,13 +157,33 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args* fake_result =
grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
- if (notify) {
- grpc_fake_resolver_response_generator_set_response(response_generator_,
- fake_result);
- } else {
- grpc_fake_resolver_response_generator_set_response_upon_error(
- response_generator_, fake_result);
+ grpc_fake_resolver_response_generator_set_response(response_generator_,
+ fake_result);
+ grpc_channel_args_destroy(fake_result);
+ grpc_lb_addresses_destroy(addresses);
+ }
+
+ void SetNextResolutionUponError(const std::vector<int>& ports) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_lb_addresses* addresses =
+ grpc_lb_addresses_create(ports.size(), nullptr);
+ for (size_t i = 0; i < ports.size(); ++i) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
+ grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
+ GPR_ASSERT(lb_uri != nullptr);
+ grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
+ false /* is balancer */,
+ "" /* balancer name */, nullptr);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
}
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args* fake_result =
+ grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
+ grpc_fake_resolver_response_generator_set_response_upon_error(
+ response_generator_, fake_result);
grpc_channel_args_destroy(fake_result);
grpc_lb_addresses_destroy(addresses);
}
@@ -578,9 +598,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[0]->port_);
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
- gpr_log(GPR_INFO, "ABOUT TO SEND ALLLLL");
SetNextResolution(ports);
- gpr_log(GPR_INFO, "SENT ALLLLLLLLLLLLLLLLLL");
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
@@ -708,7 +726,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
StartServers(kNumServers, second_ports);
// Don't notify of the update. Wait for the LB policy's re-resolution to
// "pull" the new ports.
- SetNextResolution(second_ports, false);
+ SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.