diff options
author | Vijay Pai <vpai@google.com> | 2017-09-22 23:34:43 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-10-11 21:59:33 -0700 |
commit | 4f0cd0e82c425533bef9f7ab8119cc542bcc9a41 (patch) | |
tree | 5d689d34615f8f7ecf2a585305261f8cd8081ef1 /test/cpp | |
parent | 45ca857ad007fcdc42a9f01c830de0c3bb33ffd8 (diff) |
Add flow control to inproc transport so send needs a matching recv; fix
some tests that assumed some sends could always go out
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 182 |
1 files changed, 121 insertions, 61 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a14b4d5295..2a33e8ae11 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -1304,7 +1304,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); - EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; EchoResponse recv_response; @@ -1315,31 +1314,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); // Initiate the 'RequestStream' call on client + CompletionQueue cli_cq; + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( - stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); + stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1))); // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); + std::thread t1([this, &cli_cq] { + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); + }); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); - - // Client sends 3 messages (tags 3, 4 and 5) - for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { - send_request.set_message("Ping " + grpc::to_string(tag_idx)); - cli_stream->Write(send_request, tag(tag_idx)); - Verifier(GetParam().disable_blocking) - .Expect(tag_idx, true) - .Verify(cq_.get()); - } - cli_stream->WritesDone(tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + t1.join(); bool expected_server_cq_result = true; - bool ignore_cq_result = false; - bool want_done_tag = false; + bool expected_client_cq_result = true; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); @@ -1347,10 +1339,36 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know - // for sure that all cq results will return false from this point forward + // for sure that all server cq results will return false from this + // point forward expected_server_cq_result = false; + expected_client_cq_result = false; } + bool ignore_client_cq_result = + (server_try_cancel == CANCEL_DURING_PROCESSING) || + (server_try_cancel == CANCEL_BEFORE_PROCESSING); + + std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result, + &ignore_client_cq_result, this] { + EchoRequest send_request; + // Client sends 3 messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_request.set_message("Ping " + grpc::to_string(tag_idx)); + cli_stream->Write(send_request, tag(tag_idx)); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + } + cli_stream->WritesDone(tag(6)); + // Ignore ok on WritesDone since cancel can affect it + Verifier(GetParam().disable_blocking) + .Expect(6, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + }); + + bool ignore_cq_result = false; + bool want_done_tag = false; std::thread* server_try_cancel_thd = nullptr; auto verif = Verifier(GetParam().disable_blocking); @@ -1387,6 +1405,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } } + cli_thread.join(); + if (server_try_cancel_thd != nullptr) { server_try_cancel_thd->join(); delete server_try_cancel_thd; @@ -1415,9 +1435,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + + cli_cq.Shutdown(); + void* dummy_tag; + bool dummy_ok; + while (cli_cq.Next(&dummy_tag, &dummy_ok)) { + } } // Helper for testing server-streaming RPCs which are cancelled on the server. @@ -1439,7 +1465,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; - EchoResponse recv_response; Status recv_status; ClientContext cli_ctx; ServerContext srv_ctx; @@ -1447,20 +1472,29 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { send_request.set_message("Ping"); // Initiate the 'ResponseStream' call on the client + CompletionQueue cli_cq; std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( - stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); + stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1))); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); + + std::thread t1([this, &cli_cq] { + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); + }); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + t1.join(); + EXPECT_EQ(send_request.message(), recv_request.message()); bool expected_cq_result = true; bool ignore_cq_result = false; bool want_done_tag = false; + bool expected_client_cq_result = true; + bool ignore_client_cq_result = + (server_try_cancel != CANCEL_BEFORE_PROCESSING); if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); @@ -1470,8 +1504,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // We know for sure that all cq results will be false from this point // since the server cancelled the RPC expected_cq_result = false; + expected_client_cq_result = false; } + std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result, + &ignore_client_cq_result, this] { + // Client attempts to read the three messages from the server + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + EchoResponse recv_response; + cli_stream->Read(&recv_response, tag(tag_idx)); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + } + }); + std::thread* server_try_cancel_thd = nullptr; auto verif = Verifier(GetParam().disable_blocking); @@ -1519,10 +1566,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { srv_ctx.TryCancel(); want_done_tag = true; verif.Expect(11, true); - - // Client reads may fail bacause it is notified that the stream is - // cancelled. - ignore_cq_result = true; } if (want_done_tag) { @@ -1531,13 +1574,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { want_done_tag = false; } - // Client attemts to read the three messages from the server - for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { - cli_stream->Read(&recv_response, tag(tag_idx)); - Verifier(GetParam().disable_blocking) - .Expect(tag_idx, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); - } + cli_thread.join(); // The RPC has been cancelled at this point for sure (i.e irrespective of // the value of `server_try_cancel` is). So, from this point forward, we @@ -1549,9 +1586,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + + cli_cq.Shutdown(); + void* dummy_tag; + bool dummy_ok; + while (cli_cq.Next(&dummy_tag, &dummy_ok)) { + } } // Helper for testing bidirectinal-streaming RPCs which are cancelled on the @@ -1584,38 +1627,52 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the call from the client side std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + + auto verif = Verifier(GetParam().disable_blocking); // Client sends the first and the only message send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); + verif.Expect(3, true); bool expected_cq_result = true; bool ignore_cq_result = false; bool want_done_tag = false; + int got_tag, got_tag2; + bool tag_3_done = false; + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - - // We know for sure that all cq results will be false from this point - // since the server cancelled the RPC + verif.Expect(11, true); + // We know for sure that all server cq results will be false from + // this point since the server cancelled the RPC. However, we can't + // say for sure about the client expected_cq_result = false; + ignore_cq_result = true; + + do { + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11)); + if (got_tag == 3) { + tag_3_done = true; + } + } while (got_tag != 11); + EXPECT_TRUE(srv_ctx.IsCancelled()); } std::thread* server_try_cancel_thd = nullptr; - auto verif = Verifier(GetParam().disable_blocking); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread(&ServerContext::TryCancel, &srv_ctx); @@ -1630,39 +1687,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { verif.Expect(11, true); } - int got_tag; srv_stream.Read(&recv_request, tag(4)); verif.Expect(4, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { + got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result); + got_tag2 = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 3) || (got_tag == 4) || + (got_tag == 11 && want_done_tag)); + GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) || + (got_tag2 == 11 && want_done_tag)); + // If we get 3 and 4, we don't need to wait for 11, but if + // we get 11, we should also clear 3 and 4 + if (got_tag + got_tag2 != 7) { EXPECT_TRUE(srv_ctx.IsCancelled()); want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 3) || (got_tag == 4)); } send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); verif.Expect(5, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); - } cli_stream->Read(&recv_response, tag(6)); verif.Expect(6, expected_cq_result); got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { + got_tag2 = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 6) || + (got_tag == 11 && want_done_tag)); + GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) || + (got_tag2 == 11 && want_done_tag)); + // If we get 5 and 6, we don't need to wait for 11, but if + // we get 11, we should also clear 5 and 6 + if (got_tag + got_tag2 != 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 6)); } // This is expected to succeed in all cases |