aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c21
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc37
2 files changed, 53 insertions, 5 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index a7f7e9542c..a63bdd933d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -811,19 +811,30 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
+ grpc_error *error;
+ // Get the connectivity state of the subchannel. Already existing ones may
+ // be in a state other than INIT.
+ const grpc_connectivity_state subchannel_connectivity_state =
+ grpc_subchannel_check_connectivity(subchannel, &error);
+ if (error != GRPC_ERROR_NONE) {
+ // The subchannel is in error (e.g. shutting down). Ignore it.
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
+ GRPC_ERROR_UNREF(error);
+ continue;
+ }
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(
GPR_DEBUG,
"[RR %p] index %lu: Created subchannel %p for address uri %s into "
- "subchannel_list %p",
+ "subchannel_list %p. Connectivity state %s",
(void *)p, (unsigned long)subchannel_index, (void *)subchannel,
- address_uri, (void *)subchannel_list);
+ address_uri, (void *)subchannel_list,
+ grpc_connectivity_state_name(subchannel_connectivity_state));
gpr_free(address_uri);
}
- grpc_channel_args_destroy(exec_ctx, new_args);
-
subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
sd->subchannel_list = subchannel_list;
sd->subchannel = subchannel;
@@ -835,7 +846,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
* won't be referring to this value again and it'll be overwritten after
* the first call to rr_connectivity_changed_locked */
sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->curr_connectivity_state = subchannel_connectivity_state;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index e1160ecdc6..cb4f992bfa 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -450,6 +450,43 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+
+ // Start with a single server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ // Send RPCs. They should all go servers_[0]
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(10, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[0]->service_.ResetCounters();
+
+ // All servers, but one is shutdown.
+ servers_[1]->Shutdown(false);
+ ports.clear();
+ ports.emplace_back(servers_[0]->port_);
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ WaitForServer(2);
+
+ // Send three RPCs, one per server.
+ for (size_t i = 0; i < kNumServers; ++i) SendRpc();
+ // The server in shutdown shouldn't receive any.
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;