aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/async_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/async_end2end_test.cc')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc319
1 files changed, 319 insertions, 0 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 32e8a41795..0b5215ef8e 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// Two pings and a final pong.
+TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 never comes up since no op is performed
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->Write(send_request, tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
+ srv_stream.Read(&recv_request, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Finish(send_response, Status::OK, tag(8));
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking)
+ .Expect(8, true)
+ .Expect(9, true)
+ .Verify(cq_.get());
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, two pongs.
TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub();
@@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, two pongs. Using WriteAndFinish API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, two pongs. Using WriteLast API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, one pong.
TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub();
@@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, one pong. Using server:WriteAndFinish api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, one pong. Using server:WriteLast api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Expect(8, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Metadata tests
TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub();