aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-03 12:19:33 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-03 12:19:33 -0700
commite4d2748f2fec4b189cdb7d13e25df0be95888ba2 (patch)
tree9fd089e83860607785678ee5fd86b18013b6f95e /test/cpp/end2end
parent5f51757f399eba371e62f0f0fec99a6d59235df8 (diff)
Fix async_end2end_test flow control
Completion queues + flow control + single threading is hard. We need a read outstanding on a call to grant flow control tokens to the remote end. To do that we need to request a read *before* we wait for the write to be finished, otherwise, in the case of a large write we'll block waiting for flow control tokens. Built on #6402
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc131
1 files changed, 73 insertions, 58 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 4de181b901..0232a9fa31 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -281,10 +281,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -345,12 +346,9 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking)
- .Expect(3, true)
- .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@@ -384,31 +382,35 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -442,24 +444,27 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
@@ -493,31 +498,35 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -562,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
-
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -612,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -652,11 +662,13 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -730,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
+ response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -807,12 +821,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
- EXPECT_FALSE(srv_ctx.IsCancelled());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
+ EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());