aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-09-22 23:34:43 -0700
committerGravatar Vijay Pai <vpai@google.com>2017-10-11 21:59:33 -0700
commit4f0cd0e82c425533bef9f7ab8119cc542bcc9a41 (patch)
tree5d689d34615f8f7ecf2a585305261f8cd8081ef1 /test/cpp
parent45ca857ad007fcdc42a9f01c830de0c3bb33ffd8 (diff)
Add flow control to inproc transport so send needs a matching recv; fix
some tests that assumed some sends could always go out
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc182
1 files changed, 121 insertions, 61 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index a14b4d5295..2a33e8ae11 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -1304,7 +1304,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
- EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
@@ -1315,31 +1314,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
// Initiate the 'RequestStream' call on client
+ CompletionQueue cli_cq;
+
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
- stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
+ std::thread t1([this, &cli_cq] {
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
+ });
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
-
- // Client sends 3 messages (tags 3, 4 and 5)
- for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
- send_request.set_message("Ping " + grpc::to_string(tag_idx));
- cli_stream->Write(send_request, tag(tag_idx));
- Verifier(GetParam().disable_blocking)
- .Expect(tag_idx, true)
- .Verify(cq_.get());
- }
- cli_stream->WritesDone(tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ t1.join();
bool expected_server_cq_result = true;
- bool ignore_cq_result = false;
- bool want_done_tag = false;
+ bool expected_client_cq_result = true;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@@ -1347,10 +1339,36 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
- // for sure that all cq results will return false from this point forward
+ // for sure that all server cq results will return false from this
+ // point forward
expected_server_cq_result = false;
+ expected_client_cq_result = false;
}
+ bool ignore_client_cq_result =
+ (server_try_cancel == CANCEL_DURING_PROCESSING) ||
+ (server_try_cancel == CANCEL_BEFORE_PROCESSING);
+
+ std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
+ &ignore_client_cq_result, this] {
+ EchoRequest send_request;
+ // Client sends 3 messages (tags 3, 4 and 5)
+ for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
+ send_request.set_message("Ping " + grpc::to_string(tag_idx));
+ cli_stream->Write(send_request, tag(tag_idx));
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ }
+ cli_stream->WritesDone(tag(6));
+ // Ignore ok on WritesDone since cancel can affect it
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ });
+
+ bool ignore_cq_result = false;
+ bool want_done_tag = false;
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
@@ -1387,6 +1405,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
}
+ cli_thread.join();
+
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
@@ -1415,9 +1435,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+
+ cli_cq.Shutdown();
+ void* dummy_tag;
+ bool dummy_ok;
+ while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
+ }
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
@@ -1439,7 +1465,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
- EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
@@ -1447,20 +1472,29 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
send_request.set_message("Ping");
// Initiate the 'ResponseStream' call on the client
+ CompletionQueue cli_cq;
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
- stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
+ stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
+
+ std::thread t1([this, &cli_cq] {
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
+ });
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ t1.join();
+
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
+ bool expected_client_cq_result = true;
+ bool ignore_client_cq_result =
+ (server_try_cancel != CANCEL_BEFORE_PROCESSING);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@@ -1470,8 +1504,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
expected_cq_result = false;
+ expected_client_cq_result = false;
}
+ std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
+ &ignore_client_cq_result, this] {
+ // Client attempts to read the three messages from the server
+ for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
+ EchoResponse recv_response;
+ cli_stream->Read(&recv_response, tag(tag_idx));
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ }
+ });
+
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
@@ -1519,10 +1566,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
-
- // Client reads may fail bacause it is notified that the stream is
- // cancelled.
- ignore_cq_result = true;
}
if (want_done_tag) {
@@ -1531,13 +1574,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
want_done_tag = false;
}
- // Client attemts to read the three messages from the server
- for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
- cli_stream->Read(&recv_response, tag(tag_idx));
- Verifier(GetParam().disable_blocking)
- .Expect(tag_idx, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
- }
+ cli_thread.join();
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
@@ -1549,9 +1586,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+
+ cli_cq.Shutdown();
+ void* dummy_tag;
+ bool dummy_ok;
+ while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
+ }
}
// Helper for testing bidirectinal-streaming RPCs which are cancelled on the
@@ -1584,38 +1627,52 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+
+ auto verif = Verifier(GetParam().disable_blocking);
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
+ verif.Expect(3, true);
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
+ int got_tag, got_tag2;
+ bool tag_3_done = false;
+
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
-
- // We know for sure that all cq results will be false from this point
- // since the server cancelled the RPC
+ verif.Expect(11, true);
+ // We know for sure that all server cq results will be false from
+ // this point since the server cancelled the RPC. However, we can't
+ // say for sure about the client
expected_cq_result = false;
+ ignore_cq_result = true;
+
+ do {
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
+ if (got_tag == 3) {
+ tag_3_done = true;
+ }
+ } while (got_tag != 11);
+ EXPECT_TRUE(srv_ctx.IsCancelled());
}
std::thread* server_try_cancel_thd = nullptr;
- auto verif = Verifier(GetParam().disable_blocking);
-
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&ServerContext::TryCancel, &srv_ctx);
@@ -1630,39 +1687,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
verif.Expect(11, true);
}
- int got_tag;
srv_stream.Read(&recv_request, tag(4));
verif.Expect(4, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
+ got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
+ got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
+ (got_tag == 11 && want_done_tag));
+ GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
+ (got_tag2 == 11 && want_done_tag));
+ // If we get 3 and 4, we don't need to wait for 11, but if
+ // we get 11, we should also clear 3 and 4
+ if (got_tag + got_tag2 != 7) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 3) || (got_tag == 4));
}
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
verif.Expect(5, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
- }
cli_stream->Read(&recv_response, tag(6));
verif.Expect(6, expected_cq_result);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
+ got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
+ (got_tag == 11 && want_done_tag));
+ GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
+ (got_tag2 == 11 && want_done_tag));
+ // If we get 5 and 6, we don't need to wait for 11, but if
+ // we get 11, we should also clear 5 and 6
+ if (got_tag + got_tag2 != 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 5) || (got_tag == 6));
}
// This is expected to succeed in all cases