diff options
author | Craig Tiller <ctiller@google.com> | 2016-05-03 12:19:33 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-05-03 12:19:33 -0700 |
commit | e4d2748f2fec4b189cdb7d13e25df0be95888ba2 (patch) | |
tree | 9fd089e83860607785678ee5fd86b18013b6f95e /test/cpp/end2end | |
parent | 5f51757f399eba371e62f0f0fec99a6d59235df8 (diff) |
Fix async_end2end_test flow control
Completion queues + flow control + single threading is hard.
We need a read outstanding on a call to grant flow control tokens to the
remote end.
To do that we need to request a read *before* we wait for the write to
be finished, otherwise, in the case of a large write we'll block waiting
for flow control tokens.
Built on #6402
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 131 |
1 files changed, 73 insertions, 58 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 4de181b901..0232a9fa31 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -281,10 +281,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -345,12 +346,9 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking) - .Expect(3, true) - .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier(GetParam().disable_blocking) + .Expect(3, true) .Expect(4, true) .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); @@ -384,31 +382,35 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -442,24 +444,27 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); @@ -493,31 +498,35 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -562,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -612,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -652,11 +662,13 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); + response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -730,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); + response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -807,12 +821,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - EXPECT_FALSE(srv_ctx.IsCancelled()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); + EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); |