From e4d2748f2fec4b189cdb7d13e25df0be95888ba2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 3 May 2016 12:19:33 -0700 Subject: Fix async_end2end_test flow control Completion queues + flow control + single threading is hard. We need a read outstanding on a call to grant flow control tokens to the remote end. To do that we need to request a read *before* we wait for the write to be finished, otherwise, in the case of a large write we'll block waiting for flow control tokens. Built on #6402 --- test/cpp/end2end/async_end2end_test.cc | 131 ++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 58 deletions(-) (limited to 'test/cpp/end2end') diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 4de181b901..0232a9fa31 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -281,10 +281,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -345,12 +346,9 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking) - .Expect(3, true) - .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier(GetParam().disable_blocking) + .Expect(3, true) .Expect(4, true) .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); @@ -384,31 +382,35 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -442,24 +444,27 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); @@ -493,31 +498,35 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -562,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -612,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -652,11 +662,13 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); + response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -730,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); + response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -807,12 +821,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); - Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); - EXPECT_FALSE(srv_ctx.IsCancelled()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); + EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); -- cgit v1.2.3