aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/async_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/async_end2end_test.cc')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc134
1 files changed, 80 insertions, 54 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 47555959f6..41090d161a 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -260,11 +260,31 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
server_address_ << "localhost:" << port_;
// Setup server
+ BuildAndStartServer();
+
+ gpr_tls_set(&g_is_async_end2end_test, 1);
+ }
+
+ void TearDown() override {
+ gpr_tls_set(&g_is_async_end2end_test, 0);
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ stub_.reset();
+ poll_overrider_.reset();
+ grpc_recycle_unused_port(port_);
+ }
+
+ void BuildAndStartServer() {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
- builder.RegisterService(&service_);
+ service_.reset(new grpc::testing::EchoTestService::AsyncService());
+ builder.RegisterService(service_.get());
if (GetParam().health_check_service) {
builder.RegisterService(&health_check_);
}
@@ -276,20 +296,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
new ServerBuilderSyncPluginDisabler());
builder.SetOption(move(sync_plugin_disabler));
server_ = builder.BuildAndStart();
-
- gpr_tls_set(&g_is_async_end2end_test, 1);
- }
-
- void TearDown() override {
- gpr_tls_set(&g_is_async_end2end_test, 0);
- server_->Shutdown();
- void* ignored_tag;
- bool ignored_ok;
- cq_->Shutdown();
- while (cq_->Next(&ignored_tag, &ignored_ok))
- ;
- poll_overrider_.reset();
- grpc_recycle_unused_port(port_);
}
void ResetStub() {
@@ -319,8 +325,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -341,7 +347,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
- grpc::testing::EchoTestService::AsyncService service_;
+ std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@@ -359,6 +365,26 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+TEST_P(AsyncEnd2endTest, ReconnectChannel) {
+ if (GetParam().inproc) {
+ return;
+ }
+ ResetStub();
+ SendRpc(1);
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ BuildAndStartServer();
+ // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+ // channel.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ SendRpc(1);
+}
+
// We do not need to protect notify because the use is synchronized.
void ServerWait(Server* server, int* notify) {
server->Wait();
@@ -409,8 +435,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -446,8 +472,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -508,8 +534,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->Write(send_request, tag(3));
@@ -581,8 +607,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -637,8 +663,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -689,8 +715,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -743,8 +769,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -803,8 +829,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -871,8 +897,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -948,8 +974,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -993,8 +1019,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@@ -1043,8 +1069,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@@ -1106,8 +1132,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -1170,8 +1196,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1205,8 +1231,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1297,8 +1323,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
@@ -1428,8 +1454,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1564,8 +1590,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message