diff options
author | Yuchen Zeng <zyc@google.com> | 2016-05-06 15:50:15 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-05-06 15:50:15 -0700 |
commit | 26a1785cfc5a2118b5fbb4c2e7e89edd6a241a61 (patch) | |
tree | 851f2b92ccd86254bbb0d973462932a1d555fab6 /test/cpp | |
parent | 7d099a5c907d9ab164416ea875ae07a3074adedb (diff) | |
parent | c916c1ce17fe273dff881e1c87e17ad991c2078a (diff) |
Resolve conflicts
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 338 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 3 |
2 files changed, 217 insertions, 124 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0de6c74c47..e0649bd694 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -51,6 +51,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/util/string_ref_helper.h" +#include "test/cpp/util/test_credentials_provider.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/ev_posix.h" @@ -58,6 +59,7 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; +using grpc::testing::kTlsCredentialsType; using std::chrono::system_clock; GPR_TLS_DECL(g_is_async_end2end_test); @@ -219,20 +221,37 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { } }; -class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { +class TestScenario { + public: + TestScenario(bool non_block, const grpc::string& creds_type, + const grpc::string& content) + : disable_blocking(non_block), + credentials_type(creds_type), + message_content(content) {} + void Log() const { + gpr_log(GPR_INFO, + "Scenario: disable_blocking %d, credentials %s, message size %d", + disable_blocking, credentials_type.c_str(), message_content.size()); + } + bool disable_blocking; + const grpc::string credentials_type; + const grpc::string message_content; +}; + +class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { protected: - AsyncEnd2endTest() {} + AsyncEnd2endTest() { GetParam().Log(); } void SetUp() GRPC_OVERRIDE { - poll_overrider_.reset(new PollingOverrider(!GetParam())); + poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking)); int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; // Setup server ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), - grpc::InsecureServerCredentials()); + auto server_creds = GetServerCredentials(GetParam().credentials_type); + builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); @@ -258,8 +277,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { } void ResetStub() { + ChannelArguments args; + auto channel_creds = + GetChannelCredentials(GetParam().credentials_type, &args); std::shared_ptr<Channel> channel = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + CreateCustomChannel(server_address_.str(), channel_creds, args); stub_ = grpc::testing::EchoTestService::NewStub(channel); } @@ -275,22 +297,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -330,7 +353,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -338,23 +361,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { std::chrono::system_clock::now()); std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::now() + std::chrono::seconds(10)); - Verifier(GetParam()).Verify(cq_.get(), time_now); - Verifier(GetParam()).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit); + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()) - .Expect(3, true) - .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()) + Verifier(GetParam().disable_blocking) + .Expect(3, true) .Expect(4, true) .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); @@ -375,41 +397,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ServerContext srv_ctx; ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(1, true) + .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -428,39 +457,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ServerContext srv_ctx; ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -478,41 +513,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ServerContext srv_ctx; ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -531,7 +573,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz"); @@ -544,7 +586,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, @@ -557,11 +599,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -580,7 +622,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); @@ -589,15 +631,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, ToString(server_initial_metadata.find(meta1.first)->second)); @@ -607,10 +649,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -629,7 +672,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); @@ -638,20 +681,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); + response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -675,7 +720,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2( "key2-bin", @@ -699,7 +744,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, @@ -711,9 +756,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, ToString(server_initial_metadata.find(meta3.first)->second)); @@ -725,11 +770,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); + response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -754,7 +801,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -762,15 +809,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_ctx.TryCancel(); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); } @@ -789,7 +836,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -797,25 +844,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - EXPECT_FALSE(srv_ctx.IsCancelled()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); + EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } TEST_P(AsyncEnd2endTest, UnimplementedRpc) { + ChannelArguments args; + auto channel_creds = + GetChannelCredentials(GetParam().credentials_type, &args); std::shared_ptr<Channel> channel = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + CreateCustomChannel(server_address_.str(), channel_creds, args); std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub; stub = grpc::testing::UnimplementedService::NewStub(channel); EchoRequest send_request; @@ -823,12 +874,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { Status recv_status; ClientContext cli_ctx; - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); EXPECT_EQ("", recv_status.error_message()); @@ -875,23 +926,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the 'RequestStream' call on client std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends 3 messages (tags 3, 4 and 5) for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_request.set_message("Ping " + std::to_string(tag_idx)); cli_stream->Write(send_request, tag(tag_idx)); - Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, true) + .Verify(cq_.get()); } cli_stream->WritesDone(tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); bool expected_server_cq_result = true; bool ignore_cq_result = false; @@ -899,7 +952,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know @@ -909,7 +962,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -967,13 +1020,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server sends the final message and cancelled status (but the RPC is // already cancelled at this point. So we expect the operation to fail) srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); // TODO(sreek): The expectation here should be true. This is a bug (github // issue #4972) - Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -1007,13 +1060,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the 'ResponseStream' call on the client std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); bool expected_cq_result = true; @@ -1022,7 +1075,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point @@ -1032,7 +1085,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -1092,7 +1145,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); - Verifier(GetParam()) + Verifier(GetParam().disable_blocking) .Expect(tag_idx, expected_cq_result) .Verify(cq_.get(), ignore_cq_result); } @@ -1103,11 +1156,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server finishes the stream (but the RPC is already cancelled) srv_stream.Finish(Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -1142,19 +1195,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the call from the client side std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends the first and the only message send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); bool expected_cq_result = true; bool ignore_cq_result = false; @@ -1162,7 +1215,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point @@ -1172,7 +1225,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -1272,10 +1325,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // know that cq results are supposed to return false on server. srv_stream.Finish(Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -1317,11 +1370,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) { TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING); } +std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking, + bool test_secure, + int test_big_limit) { + std::vector<TestScenario> scenarios; + std::vector<grpc::string> credentials_types; + std::vector<grpc::string> messages; + + credentials_types.push_back(kInsecureCredentialsType); + auto sec_list = GetSecureCredentialsTypeList(); + for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) { + credentials_types.push_back(*sec); + } + + messages.push_back("Hello"); + for (int sz = 1; sz < test_big_limit; sz *= 2) { + grpc::string big_msg; + for (int i = 0; i < sz * 1024; i++) { + char c = 'a' + (i % 26); + big_msg += c; + } + messages.push_back(big_msg); + } + + for (auto cred = credentials_types.begin(); cred != credentials_types.end(); + ++cred) { + for (auto msg = messages.begin(); msg != messages.end(); msg++) { + scenarios.push_back(TestScenario(false, *cred, *msg)); + if (test_disable_blocking) { + scenarios.push_back(TestScenario(true, *cred, *msg)); + } + } + } + return scenarios; +} + INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, - ::testing::Values(false, true)); + ::testing::ValuesIn(CreateTestScenarios(true, true, + 1024))); INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, AsyncEnd2endServerTryCancelTest, - ::testing::Values(false)); + ::testing::ValuesIn(CreateTestScenarios(false, false, + 0))); } // namespace } // namespace testing diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 2583ceb819..04b2b453f9 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -83,6 +83,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores( auto stub = WorkerService::NewStub( CreateChannel(*it, InsecureChannelCredentials())); grpc::ClientContext ctx; + ctx.set_fail_fast(false); CoreRequest dummy; CoreResponse cores; grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); @@ -166,6 +167,7 @@ namespace runsc { static ClientContext* AllocContext(list<ClientContext>* contexts) { contexts->emplace_back(); auto context = &contexts->back(); + context->set_fail_fast(false); return context; } @@ -435,6 +437,7 @@ void RunQuit() { CreateChannel(workers[i], InsecureChannelCredentials())); Void dummy; grpc::ClientContext ctx; + ctx.set_fail_fast(false); GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); } } |