aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-22 08:28:22 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-22 08:28:22 -0700
commitf9dd54c2ebd1eeba572d3264a41c352884608e8e (patch)
treedff2c0c892af24d282fe10528126bc9d2f8dd482
parent786522166ed35f93d64c316c0e4911a74d782be0 (diff)
parent5e8abedd6110d58067980a2b089aab48311317a0 (diff)
Merge pull request #3378 from yang-g/connection_failure_detection
remove connectivity watcher from interested party early
-rw-r--r--src/core/surface/channel_connectivity.c16
-rw-r--r--test/cpp/end2end/end2end_test.cc18
2 files changed, 30 insertions, 4 deletions
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..5c55ad3655 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -67,6 +67,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
+ int removed;
grpc_iomgr_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
@@ -77,10 +78,6 @@ typedef struct {
} state_watcher;
static void delete_state_watcher(state_watcher *w) {
- grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(client_channel_elem,
- grpc_cq_pollset(w->cq));
GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
gpr_mu_destroy(&w->mu);
gpr_free(w);
@@ -112,7 +109,17 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) {
static void partly_done(state_watcher *w, int due_to_completion) {
int delete = 0;
+ grpc_channel_element *client_channel_elem = NULL;
+ gpr_mu_lock(&w->mu);
+ if (w->removed == 0) {
+ w->removed = 1;
+ client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_del_interested_party(client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ }
+ gpr_mu_unlock(&w->mu);
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
@@ -163,6 +170,7 @@ void grpc_channel_watch_connectivity_state(
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
+ w->removed = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index bfe799bd15..b3cfcd51a7 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1149,6 +1149,24 @@ TEST_F(End2endTest, ChannelState) {
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
}
+// Takes 10s.
+TEST_F(End2endTest, ChannelStateTimeout) {
+ int port = grpc_pick_unused_port_or_die();
+ std::ostringstream server_address;
+ server_address << "127.0.0.1:" << port;
+ // Channel to non-existing server
+ auto channel = CreateChannel(server_address.str(), InsecureCredentials());
+ // Start IDLE
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
+
+ auto state = GRPC_CHANNEL_IDLE;
+ for (int i = 0; i < 10; i++) {
+ channel->WaitForStateChange(state, std::chrono::system_clock::now() +
+ std::chrono::seconds(1));
+ state = channel->GetState(false);
+ }
+}
+
// Talking to a non-existing service.
TEST_F(End2endTest, NonExistingService) {
ResetChannel();