diff options
Diffstat (limited to 'test/cpp/end2end/async_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 230 |
1 files changed, 118 insertions, 112 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 871f956491..117d8bb9fa 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -67,27 +67,45 @@ namespace { void* tag(int i) { return (void*)(gpr_intptr) i; } -void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { - bool ok; - void* got_tag; - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - EXPECT_EQ(expect_ok, ok); - EXPECT_EQ(tag(i), got_tag); -} - -void verify_timed_ok( - CompletionQueue* cq, int i, bool expect_ok, - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::time_point::max(), - CompletionQueue::NextStatus expected_outcome = CompletionQueue::GOT_EVENT) { - bool ok; - void* got_tag; - EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome); - if (expected_outcome == CompletionQueue::GOT_EVENT) { - EXPECT_EQ(expect_ok, ok); - EXPECT_EQ(tag(i), got_tag); +class Verifier { + public: + Verifier& Expect(int i, bool expect_ok) { + expectations_[tag(i)] = expect_ok; + return *this; } -} + void Verify(CompletionQueue *cq) { + GPR_ASSERT(!expectations_.empty()); + while (!expectations_.empty()) { + bool ok; + void* got_tag; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + EXPECT_EQ(it->second, ok); + expectations_.erase(it); + } + } + void Verify(CompletionQueue *cq, std::chrono::system_clock::time_point deadline) { + if (expectations_.empty()) { + bool ok; + void *got_tag; + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::TIMEOUT); + } else { + while (!expectations_.empty()) { + bool ok; + void *got_tag; + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::GOT_EVENT); + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + EXPECT_EQ(it->second, ok); + expectations_.erase(it); + } + } + } + + private: + std::map<void*, bool> expectations_; +}; class AsyncEnd2endTest : public ::testing::Test { protected: @@ -100,7 +118,7 @@ class AsyncEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); builder.RegisterAsyncService(&service_); - srv_cq_ = builder.AddCompletionQueue(); + cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -108,11 +126,8 @@ class AsyncEnd2endTest : public ::testing::Test { server_->Shutdown(); void* ignored_tag; bool ignored_ok; - cli_cq_.Shutdown(); - srv_cq_->Shutdown(); - while (cli_cq_.Next(&ignored_tag, &ignored_ok)) - ; - while (srv_cq_->Next(&ignored_tag, &ignored_ok)) + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -122,11 +137,6 @@ class AsyncEnd2endTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } - void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } - void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } - void client_fail(int i) { verify_ok(&cli_cq_, i, false); } - void SendRpc(int num_rpcs) { for (int i = 0; i < num_rpcs; i++) { EchoRequest send_request; @@ -141,28 +151,27 @@ class AsyncEnd2endTest : public ::testing::Test { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, - srv_cq_.get(), srv_cq_.get(), tag(2)); + cq_.get(), cq_.get(), tag(2)); - server_ok(2); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } } - CompletionQueue cli_cq_; - std::unique_ptr<ServerCompletionQueue> srv_cq_; + std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<Server> server_; grpc::cpp::test::util::TestService::AsyncService service_; @@ -195,27 +204,27 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); std::chrono::system_clock::time_point time_now( std::chrono::system_clock::now()); std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::now() + std::chrono::seconds(10)); - verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT); - verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); + Verifier().Verify(cq_.get(), time_now); + Verifier().Verify(cq_.get(), time_now); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); - verify_timed_ok(srv_cq_.get(), 2, true, time_limit); + Verifier().Expect(2, true).Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - verify_timed_ok(srv_cq_.get(), 3, true); + Verifier().Expect(3, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - verify_timed_ok(&cli_cq_, 4, true); + Verifier().Expect(4, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -236,40 +245,39 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( - stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), + cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - client_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - client_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(6)); - server_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - client_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - server_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - server_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - client_ok(10); + Verifier().Expect(10, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -290,38 +298,37 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( - stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - srv_cq_.get(), srv_cq_.get(), tag(2)); + cq_.get(), cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - server_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(8)); - client_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); - client_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -341,40 +348,39 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { send_request.set_message("Hello"); std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > - cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(), - srv_cq_.get(), tag(2)); + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), + cq_.get(), tag(2)); - server_ok(2); - client_ok(1); + Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - client_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - client_ok(7); + Verifier().Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - server_fail(8); + Verifier().Expect(8, false).Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - server_ok(9); + Verifier().Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - client_ok(10); + Verifier().Expect(10, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -400,11 +406,11 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { cli_ctx.AddMetadata(meta2.first, meta2.second); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -414,10 +420,10 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -441,19 +447,19 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); @@ -461,10 +467,10 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -488,24 +494,24 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); - server_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(5)); - client_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -548,11 +554,11 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { cli_ctx.AddMetadata(meta2.first, meta2.second); std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( - stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), - srv_cq_.get(), tag(2)); - server_ok(2); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + Verifier().Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -562,9 +568,9 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - server_ok(3); + Verifier().Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - client_ok(4); + Verifier().Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); @@ -575,10 +581,10 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); - server_ok(5); + Verifier().Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - client_ok(6); + Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); |