diff options
Diffstat (limited to 'test/cpp/end2end/async_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 32e8a41795..0b5215ef8e 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { EXPECT_TRUE(recv_status.ok()); } +// Two pings and a final pong. +TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 never comes up since no op is performed + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->Write(send_request, tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(5)); + srv_stream.Read(&recv_request, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.Finish(send_response, Status::OK, tag(8)); + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking) + .Expect(8, true) + .Expect(9, true) + .Verify(cq_.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // One ping, two pongs. TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ResetStub(); @@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, two pongs. Using WriteAndFinish API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5)); + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, two pongs. Using WriteLast API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteLast(send_response, WriteOptions(), tag(5)); + cli_stream->Read(&recv_response, tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // One ping, one pong. TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ResetStub(); @@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, one pong. Using server:WriteAndFinish api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6)); + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, one pong. Using server:WriteLast api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteLast(send_response, WriteOptions(), tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Expect(8, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // Metadata tests TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ResetStub(); |