aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--doc/environment_variables.md8
-rw-r--r--grpc.def1
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c6
-rw-r--r--src/core/lib/iomgr/iomgr.c8
-rw-r--r--src/core/lib/support/string.c13
-rw-r--r--src/core/lib/support/string.h3
-rw-r--r--src/cpp/client/channel_cc.cc202
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h3
-rw-r--r--test/core/support/string_test.c16
-rw-r--r--test/cpp/end2end/async_end2end_test.cc134
-rw-r--r--test/cpp/end2end/end2end_test.cc26
13 files changed, 346 insertions, 79 deletions
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index a2cde92736..0123f3f25d 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -113,3 +113,11 @@ some configuration as environment variables that can be set.
- native (default)- a DNS resolver based around getaddrinfo(), creates a new thread to
perform name resolution
- ares - a DNS resolver based around the c-ares library
+
+* GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER
+ The channel connectivity watcher uses one extra thread to check the channel
+ state every 500 ms on the client side. It can help reconnect disconnected
+ client channels (mostly due to idleness), so that the next RPC on this channel
+ won't fail. Set to 1 to turn off this watcher and save a thread. Please note
+ this is a temporary work-around, it will be removed in the future once we have
+ support for automatically reestablishing failed connections.
diff --git a/grpc.def b/grpc.def
index ec266fce95..37cc7e20f0 100644
--- a/grpc.def
+++ b/grpc.def
@@ -70,6 +70,7 @@ EXPORTS
grpc_channel_check_connectivity_state
grpc_channel_num_external_connectivity_watchers
grpc_channel_watch_connectivity_state
+ grpc_channel_support_connectivity_watcher
grpc_channel_create_call
grpc_channel_ping
grpc_channel_register_call
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 943d6e4891..b562167b5f 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -178,6 +178,9 @@ GRPCAPI void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
+/** Check whether a grpc channel supports connectivity watcher */
+GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel);
+
/** Create a call given a grpc_channel, in order to call 'method'. All
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function.
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index b83c95275f..0a9e90d12e 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -191,6 +191,12 @@ static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(wa);
}
+int grpc_channel_support_connectivity_watcher(grpc_channel *channel) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1;
+}
+
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 3d19953eeb..1feea6d628 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -164,13 +164,7 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
- if (env == NULL) return false;
- static const char *truthy[] = {"yes", "Yes", "YES", "true",
- "True", "TRUE", "1"};
- bool should_we = false;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) should_we = true;
- }
+ bool should_we = gpr_is_true(env);
gpr_free(env);
return should_we;
}
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index b65009754a..ec93303024 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -298,3 +298,16 @@ void *gpr_memrchr(const void *s, int c, size_t n) {
}
return NULL;
}
+
+bool gpr_is_true(const char *s) {
+ if (s == NULL) {
+ return false;
+ }
+ static const char *truthy[] = {"yes", "true", "1"};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ if (0 == gpr_stricmp(s, truthy[i])) {
+ return true;
+ }
+ }
+ return false;
+}
diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h
index e11df8439d..5a56fa3a0a 100644
--- a/src/core/lib/support/string.h
+++ b/src/core/lib/support/string.h
@@ -19,6 +19,7 @@
#ifndef GRPC_CORE_LIB_SUPPORT_STRING_H
#define GRPC_CORE_LIB_SUPPORT_STRING_H
+#include <stdbool.h>
#include <stddef.h>
#include <grpc/support/port_platform.h>
@@ -106,6 +107,8 @@ int gpr_stricmp(const char *a, const char *b);
void *gpr_memrchr(const void *s, int c, size_t n);
+/** Return true if lower(s) equals "true", "yes" or "1", otherwise false. */
+bool gpr_is_true(const char *s);
#ifdef __cplusplus
}
#endif
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..19a25c838f 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -18,7 +18,10 @@
#include <grpc++/channel.h>
+#include <chrono>
+#include <condition_variable>
#include <memory>
+#include <mutex>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -35,17 +38,197 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
namespace grpc {
+namespace {
+int kConnectivityCheckIntervalMsec = 500;
+void WatchStateChange(void* arg);
+
+class TagSaver final : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* status) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
+// Constantly watches channel connectivity status to reconnect a transiently
+// disconnected channel. This is a temporary work-around before we have retry
+// support.
+class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
+ public:
+ static void StartWatching(grpc_channel* channel) {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_ == nullptr) {
+ g_watcher_ = new ChannelConnectivityWatcher();
+ }
+ g_watcher_->StartWatchingLocked(channel);
+ }
+ }
+
+ static void StopWatching() {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_->StopWatchingLocked()) {
+ delete g_watcher_;
+ g_watcher_ = nullptr;
+ }
+ }
+ }
+
+ private:
+ ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
+ gpr_ref_init(&ref_, 0);
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ static bool IsDisabled() {
+ char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
+ bool disabled = gpr_is_true(env);
+ gpr_free(env);
+ return disabled;
+ }
+
+ void WatchStateChangeImpl() {
+ bool ok = false;
+ void* tag = NULL;
+ CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ if (shutdown_) {
+ // Drain cq_ if the watcher is shutting down
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
+ } else {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ // Make sure we've seen 2 TIMEOUTs before going to sleep
+ if (status == CompletionQueue::TIMEOUT) {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ if (status == CompletionQueue::TIMEOUT) {
+ shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
+ kConnectivityCheckIntervalMsec));
+ continue;
+ }
+ }
+ }
+ }
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state =
+ grpc_channel_check_connectivity_state(channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ if (gpr_unref(&ref_)) {
+ break;
+ }
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ }
+ }
+ }
+
+ void StartWatchingLocked(grpc_channel* channel) {
+ if (thd_id_ != 0) {
+ gpr_ref(&ref_);
+ ++channel_count_;
+ ChannelState* channel_state = new ChannelState(channel);
+ // The first grpc_channel_watch_connectivity_state() is not used to
+ // monitor the channel state change, but to hold a reference of the
+ // c channel. So that WatchStateChangeImpl() can observe state ==
+ // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
+ new TagSaver(nullptr));
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
+ new TagSaver(channel_state));
+ }
+ }
+
+ bool StopWatchingLocked() {
+ if (--channel_count_ == 0) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ shutdown_ = true;
+ shutdown_cv_.notify_one();
+ }
+ gpr_thd_join(thd_id_);
+ return true;
+ }
+ return false;
+ }
+
+ friend void WatchStateChange(void* arg);
+ struct ChannelState {
+ explicit ChannelState(grpc_channel* channel)
+ : channel(channel), state(GRPC_CHANNEL_IDLE){};
+ grpc_channel* channel;
+ grpc_connectivity_state state;
+ CompletionQueue shutdown_cq;
+ };
+ gpr_thd_id thd_id_;
+ CompletionQueue cq_;
+ gpr_refcount ref_;
+ int channel_count_;
+
+ std::mutex shutdown_mu_;
+ std::condition_variable shutdown_cv_; // protected by shutdown_mu_
+ bool shutdown_; // protected by shutdown_mu_
+
+ static std::mutex g_watcher_mu_;
+ static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
+};
+
+std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
+ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
+
+void WatchStateChange(void* arg) {
+ ChannelConnectivityWatcher* watcher =
+ static_cast<ChannelConnectivityWatcher*>(arg);
+ watcher->WatchStateChangeImpl();
+}
+} // namespace
+
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
+ if (grpc_channel_support_connectivity_watcher(channel)) {
+ ChannelConnectivityWatcher::StartWatching(channel);
+ }
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ const bool stop_watching =
+ grpc_channel_support_connectivity_watcher(c_channel_);
+ grpc_channel_destroy(c_channel_);
+ if (stop_watching) {
+ ChannelConnectivityWatcher::StopWatching();
+ }
+}
namespace {
@@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
-namespace {
-class TagSaver final : public CompletionQueueTag {
- public:
- explicit TagSaver(void* tag) : tag_(tag) {}
- ~TagSaver() override {}
- bool FinalizeResult(void** tag, bool* status) override {
- *tag = tag_;
- delete this;
- return true;
- }
-
- private:
- void* tag_;
-};
-
-} // namespace
-
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 9671d794c5..0402ce34fb 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -93,6 +93,7 @@ grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
+grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import;
grpc_channel_create_call_type grpc_channel_create_call_import;
grpc_channel_ping_type grpc_channel_ping_import;
grpc_channel_register_call_type grpc_channel_register_call_import;
@@ -399,6 +400,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers");
grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state");
+ grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher");
grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call");
grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping");
grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b64199be8e..e3704e592b 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -260,6 +260,9 @@ extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_ext
typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
#define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import
+typedef int(*grpc_channel_support_connectivity_watcher_type)(grpc_channel *channel);
+extern grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import;
+#define grpc_channel_support_connectivity_watcher grpc_channel_support_connectivity_watcher_import
typedef grpc_call *(*grpc_channel_create_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved);
extern grpc_channel_create_call_type grpc_channel_create_call_import;
#define grpc_channel_create_call grpc_channel_create_call_import
diff --git a/test/core/support/string_test.c b/test/core/support/string_test.c
index a3c33c3fa4..bee2139477 100644
--- a/test/core/support/string_test.c
+++ b/test/core/support/string_test.c
@@ -279,6 +279,21 @@ static void test_memrchr(void) {
GPR_ASSERT(0 == strcmp((const char *)gpr_memrchr("hello", 'l', 5), "lo"));
}
+static void test_is_true(void) {
+ LOG_TEST_NAME("test_is_true");
+
+ GPR_ASSERT(true == gpr_is_true("True"));
+ GPR_ASSERT(true == gpr_is_true("true"));
+ GPR_ASSERT(true == gpr_is_true("TRUE"));
+ GPR_ASSERT(true == gpr_is_true("Yes"));
+ GPR_ASSERT(true == gpr_is_true("yes"));
+ GPR_ASSERT(true == gpr_is_true("YES"));
+ GPR_ASSERT(true == gpr_is_true("1"));
+ GPR_ASSERT(false == gpr_is_true(NULL));
+ GPR_ASSERT(false == gpr_is_true(""));
+ GPR_ASSERT(false == gpr_is_true("0"));
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_strdup();
@@ -292,5 +307,6 @@ int main(int argc, char **argv) {
test_leftpad();
test_stricmp();
test_memrchr();
+ test_is_true();
return 0;
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 7cb7b262de..e841a702d4 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -260,11 +260,31 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
server_address_ << "localhost:" << port_;
// Setup server
+ BuildAndStartServer();
+
+ gpr_tls_set(&g_is_async_end2end_test, 1);
+ }
+
+ void TearDown() override {
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ stub_.reset();
+ poll_overrider_.reset();
+ gpr_tls_set(&g_is_async_end2end_test, 0);
+ grpc_recycle_unused_port(port_);
+ }
+
+ void BuildAndStartServer() {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
- builder.RegisterService(&service_);
+ service_.reset(new grpc::testing::EchoTestService::AsyncService());
+ builder.RegisterService(service_.get());
if (GetParam().health_check_service) {
builder.RegisterService(&health_check_);
}
@@ -276,20 +296,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
new ServerBuilderSyncPluginDisabler());
builder.SetOption(move(sync_plugin_disabler));
server_ = builder.BuildAndStart();
-
- gpr_tls_set(&g_is_async_end2end_test, 1);
- }
-
- void TearDown() override {
- server_->Shutdown();
- void* ignored_tag;
- bool ignored_ok;
- cq_->Shutdown();
- while (cq_->Next(&ignored_tag, &ignored_ok))
- ;
- poll_overrider_.reset();
- gpr_tls_set(&g_is_async_end2end_test, 0);
- grpc_recycle_unused_port(port_);
}
void ResetStub() {
@@ -319,8 +325,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -341,7 +347,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
- grpc::testing::EchoTestService::AsyncService service_;
+ std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@@ -359,6 +365,26 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+TEST_P(AsyncEnd2endTest, ReconnectChannel) {
+ if (GetParam().inproc) {
+ return;
+ }
+ ResetStub();
+ SendRpc(1);
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ BuildAndStartServer();
+ // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+ // channel.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ SendRpc(1);
+}
+
// We do not need to protect notify because the use is synchronized.
void ServerWait(Server* server, int* notify) {
server->Wait();
@@ -407,8 +433,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -444,8 +470,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -506,8 +532,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->Write(send_request, tag(3));
@@ -579,8 +605,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -635,8 +661,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -687,8 +713,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -741,8 +767,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -801,8 +827,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -869,8 +895,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -946,8 +972,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -991,8 +1017,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@@ -1041,8 +1067,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@@ -1104,8 +1130,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -1168,8 +1194,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1203,8 +1229,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1295,8 +1321,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
@@ -1426,8 +1452,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1562,8 +1588,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 8bada48a2b..1f4861a7e6 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -238,6 +238,18 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
int port = grpc_pick_unused_port_or_die();
server_address_ << "127.0.0.1:" << port;
// Setup server
+ BuildAndStartServer(processor);
+ }
+
+ void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
+ if (is_server_started_) {
+ server_->Shutdown();
+ BuildAndStartServer(processor);
+ }
+ }
+
+ void BuildAndStartServer(
+ const std::shared_ptr<AuthMetadataProcessor>& processor) {
ServerBuilder builder;
ConfigureServerBuilder(&builder);
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
@@ -685,6 +697,20 @@ TEST_P(End2endTest, MultipleRpcs) {
}
}
+TEST_P(End2endTest, ReconnectChannel) {
+ if (GetParam().inproc) {
+ return;
+ }
+ ResetStub();
+ SendRpc(stub_.get(), 1, false);
+ RestartServer(std::shared_ptr<AuthMetadataProcessor>());
+ // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+ // channel.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ SendRpc(stub_.get(), 1, false);
+}
+
TEST_P(End2endTest, RequestStreamOneRequest) {
ResetStub();
EchoRequest request;