diff options
author | Yuchen Zeng <zyc@google.com> | 2016-05-17 14:06:44 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-05-17 14:06:44 -0700 |
commit | 749005efcc657cc5199c67688b734fb3de5852fe (patch) | |
tree | a4ba7895a51b8bce38a0387d01b7f4176607bb1a /test/cpp | |
parent | acf94785b7e44200d680831dac392b1e7a3a0c21 (diff) | |
parent | d5ed5a98db64703d8997b270b9e2fd95bf712403 (diff) |
Merge remote-tracking branch 'upstream/master' into reflection_separate_lib
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 372 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 28 | ||||
-rw-r--r-- | test/cpp/end2end/server_builder_plugin_test.cc | 256 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 8 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 1 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_api_test.cc | 11 | ||||
-rw-r--r-- | test/cpp/interop/client.cc | 13 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.cc | 337 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.h | 62 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.cc | 59 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.h | 28 | ||||
-rw-r--r-- | test/cpp/interop/stress_test.cc | 21 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 12 | ||||
-rwxr-xr-x | test/cpp/qps/gen_build_yaml.py | 10 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 6 |
15 files changed, 964 insertions, 260 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 7e4d6046d6..45f5eb1ddd 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -51,6 +51,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/util/string_ref_helper.h" +#include "test/cpp/util/test_credentials_provider.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/ev_posix.h" @@ -58,6 +59,7 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; +using grpc::testing::kTlsCredentialsType; using std::chrono::system_clock; GPR_TLS_DECL(g_is_async_end2end_test); @@ -197,22 +199,67 @@ class Verifier { bool spin_; }; -class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { +// This class disables the server builder plugins that may add sync services to +// the server. If there are sync services, UnimplementedRpc test will triger +// the sync unkown rpc routine on the server side, rather than the async one +// that needs to be tested here. +class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { + public: + void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} + + void UpdatePlugins( + std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + GRPC_OVERRIDE { + auto plugin = plugins->begin(); + while (plugin != plugins->end()) { + if ((*plugin).second->has_sync_methods()) { + plugins->erase(plugin++); + } else { + plugin++; + } + } + } +}; + +class TestScenario { + public: + TestScenario(bool non_block, const grpc::string& creds_type, + const grpc::string& content) + : disable_blocking(non_block), + credentials_type(creds_type), + message_content(content) {} + void Log() const { + gpr_log(GPR_INFO, + "Scenario: disable_blocking %d, credentials %s, message size %d", + disable_blocking, credentials_type.c_str(), message_content.size()); + } + bool disable_blocking; + const grpc::string credentials_type; + const grpc::string message_content; +}; + +class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { protected: - AsyncEnd2endTest() {} + AsyncEnd2endTest() { GetParam().Log(); } void SetUp() GRPC_OVERRIDE { - poll_overrider_.reset(new PollingOverrider(!GetParam())); + poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking)); - int port = grpc_pick_unused_port_or_die(); - server_address_ << "localhost:" << port; + port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port_; // Setup server ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), - grpc::InsecureServerCredentials()); + auto server_creds = GetServerCredentials(GetParam().credentials_type); + builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); + + // TODO(zyc): make a test option to choose wheather sync plugins should be + // deleted + std::unique_ptr<ServerBuilderOption> sync_plugin_disabler( + new ServerBuilderSyncPluginDisabler()); + builder.SetOption(move(sync_plugin_disabler)); server_ = builder.BuildAndStart(); gpr_tls_set(&g_is_async_end2end_test, 1); @@ -227,11 +274,15 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { ; poll_overrider_.reset(); gpr_tls_set(&g_is_async_end2end_test, 0); + grpc_recycle_unused_port(port_); } void ResetStub() { + ChannelArguments args; + auto channel_creds = + GetChannelCredentials(GetParam().credentials_type, &args); std::shared_ptr<Channel> channel = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + CreateCustomChannel(server_address_.str(), channel_creds, args); stub_ = grpc::testing::EchoTestService::NewStub(channel); } @@ -247,22 +298,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -274,6 +326,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { std::unique_ptr<Server> server_; grpc::testing::EchoTestService::AsyncService service_; std::ostringstream server_address_; + int port_; std::unique_ptr<PollingOverrider> poll_overrider_; }; @@ -302,7 +355,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -310,23 +363,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { std::chrono::system_clock::now()); std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::now() + std::chrono::seconds(10)); - Verifier(GetParam()).Verify(cq_.get(), time_now); - Verifier(GetParam()).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit); + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()) - .Expect(3, true) - .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()) + Verifier(GetParam().disable_blocking) + .Expect(3, true) .Expect(4, true) .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); @@ -347,41 +399,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ServerContext srv_ctx; ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(1, true) + .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -400,39 +459,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ServerContext srv_ctx; ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -450,41 +515,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ServerContext srv_ctx; ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(7, true) + .Expect(8, false) + .Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); - cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(9, true) + .Expect(10, true) + .Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } @@ -503,7 +575,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz"); @@ -516,7 +588,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, @@ -529,11 +601,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -552,7 +624,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); @@ -561,15 +633,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, ToString(server_initial_metadata.find(meta1.first)->second)); @@ -579,10 +651,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -601,7 +674,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2("key2", "val2"); @@ -610,20 +683,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); + response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -647,7 +722,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::pair<grpc::string, grpc::string> meta1("key1", "val1"); std::pair<grpc::string, grpc::string> meta2( "key2-bin", @@ -671,7 +746,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, @@ -683,9 +758,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, ToString(server_initial_metadata.find(meta3.first)->second)); @@ -697,11 +772,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); + response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); - response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -726,7 +803,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -734,15 +811,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_ctx.TryCancel(); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); } @@ -761,7 +838,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { ServerContext srv_ctx; grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -769,25 +846,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); - Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); - EXPECT_FALSE(srv_ctx.IsCancelled()); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Expect(5, true) + .Verify(cq_.get()); + EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } TEST_P(AsyncEnd2endTest, UnimplementedRpc) { + ChannelArguments args; + auto channel_creds = + GetChannelCredentials(GetParam().credentials_type, &args); std::shared_ptr<Channel> channel = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + CreateCustomChannel(server_address_.str(), channel_creds, args); std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub; stub = grpc::testing::UnimplementedService::NewStub(channel); EchoRequest send_request; @@ -795,12 +876,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { Status recv_status; ClientContext cli_ctx; - send_request.set_message("Hello"); + send_request.set_message(GetParam().message_content); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); EXPECT_EQ("", recv_status.error_message()); @@ -847,23 +928,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the 'RequestStream' call on client std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends 3 messages (tags 3, 4 and 5) for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_request.set_message("Ping " + std::to_string(tag_idx)); cli_stream->Write(send_request, tag(tag_idx)); - Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, true) + .Verify(cq_.get()); } cli_stream->WritesDone(tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); bool expected_server_cq_result = true; bool ignore_cq_result = false; @@ -871,7 +954,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know @@ -881,7 +964,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -939,13 +1022,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server sends the final message and cancelled status (but the RPC is // already cancelled at this point. So we expect the operation to fail) srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); // TODO(sreek): The expectation here should be true. This is a bug (github // issue #4972) - Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -979,13 +1062,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the 'ResponseStream' call on the client std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); bool expected_cq_result = true; @@ -994,7 +1077,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point @@ -1004,7 +1087,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -1064,7 +1147,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); - Verifier(GetParam()) + Verifier(GetParam().disable_blocking) .Expect(tag_idx, expected_cq_result) .Verify(cq_.get(), ignore_cq_result); } @@ -1075,11 +1158,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server finishes the stream (but the RPC is already cancelled) srv_stream.Finish(Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -1114,19 +1197,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the call from the client side std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends the first and the only message send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); bool expected_cq_result = true; bool ignore_cq_result = false; @@ -1134,7 +1217,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point @@ -1144,7 +1227,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::thread* server_try_cancel_thd = NULL; - auto verif = Verifier(GetParam()); + auto verif = Verifier(GetParam().disable_blocking); if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = @@ -1244,10 +1327,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // know that cq results are supposed to return false on server. srv_stream.Finish(Status::CANCELLED, tag(9)); - Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -1289,11 +1372,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) { TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING); } +std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking, + bool test_secure, + int test_big_limit) { + std::vector<TestScenario> scenarios; + std::vector<grpc::string> credentials_types; + std::vector<grpc::string> messages; + + credentials_types.push_back(kInsecureCredentialsType); + auto sec_list = GetSecureCredentialsTypeList(); + for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) { + credentials_types.push_back(*sec); + } + + messages.push_back("Hello"); + for (int sz = 1; sz < test_big_limit; sz *= 2) { + grpc::string big_msg; + for (int i = 0; i < sz * 1024; i++) { + char c = 'a' + (i % 26); + big_msg += c; + } + messages.push_back(big_msg); + } + + for (auto cred = credentials_types.begin(); cred != credentials_types.end(); + ++cred) { + for (auto msg = messages.begin(); msg != messages.end(); msg++) { + scenarios.push_back(TestScenario(false, *cred, *msg)); + if (test_disable_blocking) { + scenarios.push_back(TestScenario(true, *cred, *msg)); + } + } + } + return scenarios; +} + INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, - ::testing::Values(false, true)); + ::testing::ValuesIn(CreateTestScenarios(true, true, + 1024))); INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, AsyncEnd2endServerTryCancelTest, - ::testing::Values(false)); + ::testing::ValuesIn(CreateTestScenarios(false, false, + 0))); } // namespace } // namespace testing diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 0c9313f88f..e3408bff75 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -975,6 +975,34 @@ TEST_P(End2endTest, NonExistingService) { EXPECT_EQ("", s.error_message()); } +// Ask the server to send back a serialized proto in trailer. +// This is an example of setting error details. +TEST_P(End2endTest, BinaryTrailerTest) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + request.mutable_param()->set_echo_metadata(true); + DebugInfo* info = request.mutable_param()->mutable_debug_info(); + info->add_stack_entries("stack_entry_1"); + info->add_stack_entries("stack_entry_2"); + info->add_stack_entries("stack_entry_3"); + info->set_detail("detailed debug info"); + grpc::string expected_string = info->SerializeAsString(); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + auto trailers = context.GetServerTrailingMetadata(); + EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey)); + auto iter = trailers.find(kDebugInfoTrailerKey); + EXPECT_EQ(expected_string, iter->second); + // Parse the returned trailer into a DebugInfo proto. + DebugInfo returned_info; + EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); +} + ////////////////////////////////////////////////////////////////////////// // Test with and without a proxy. class ProxyEnd2endTest : public End2endTest { diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc new file mode 100644 index 0000000000..87e3709d7d --- /dev/null +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -0,0 +1,256 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/impl/server_builder_option.h> +#include <grpc++/impl/server_builder_plugin.h> +#include <grpc++/impl/server_initializer.h> +#include <grpc++/security/credentials.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc/grpc.h> +#include <gtest/gtest.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#define PLUGIN_NAME "TestServerBuilderPlugin" + +namespace grpc { +namespace testing { + +class TestServerBuilderPlugin : public ServerBuilderPlugin { + public: + TestServerBuilderPlugin() : service_(new TestServiceImpl()) { + init_server_is_called_ = false; + finish_is_called_ = false; + change_arguments_is_called_ = false; + } + + grpc::string name() GRPC_OVERRIDE { return PLUGIN_NAME; } + + void InitServer(ServerInitializer* si) GRPC_OVERRIDE { + init_server_is_called_ = true; + if (register_service_) { + si->RegisterService(service_); + } + } + + void Finish(ServerInitializer* si) GRPC_OVERRIDE { finish_is_called_ = true; } + + void ChangeArguments(const grpc::string& name, void* value) GRPC_OVERRIDE { + change_arguments_is_called_ = true; + } + + bool has_async_methods() const GRPC_OVERRIDE { + if (register_service_) { + return service_->has_async_methods(); + } + return false; + } + + bool has_sync_methods() const GRPC_OVERRIDE { + if (register_service_) { + return service_->has_synchronous_methods(); + } + return false; + } + + void SetRegisterService() { register_service_ = true; } + + bool init_server_is_called() { return init_server_is_called_; } + bool finish_is_called() { return finish_is_called_; } + bool change_arguments_is_called() { return change_arguments_is_called_; } + + private: + bool init_server_is_called_; + bool finish_is_called_; + bool change_arguments_is_called_; + bool register_service_; + std::shared_ptr<TestServiceImpl> service_; +}; + +class InsertPluginServerBuilderOption : public ServerBuilderOption { + public: + InsertPluginServerBuilderOption() { register_service_ = false; } + + void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} + + void UpdatePlugins( + std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + GRPC_OVERRIDE { + plugins->clear(); + + std::unique_ptr<TestServerBuilderPlugin> plugin( + new TestServerBuilderPlugin()); + if (register_service_) plugin->SetRegisterService(); + (*plugins)[plugin->name()] = std::move(plugin); + } + + void SetRegisterService() { register_service_ = true; } + + private: + bool register_service_; +}; + +std::unique_ptr<ServerBuilderPlugin> CreateTestServerBuilderPlugin() { + return std::unique_ptr<ServerBuilderPlugin>(new TestServerBuilderPlugin()); +} + +void AddTestServerBuilderPlugin() { + static bool already_here = false; + if (already_here) return; + already_here = true; + ::grpc::ServerBuilder::InternalAddPluginFactory( + &CreateTestServerBuilderPlugin); +} + +// Force AddServerBuilderPlugin() to be called at static initialization time. +struct StaticTestPluginInitializer { + StaticTestPluginInitializer() { AddTestServerBuilderPlugin(); } +} static_plugin_initializer_test_; + +// When the param boolean is true, the ServerBuilder plugin will be added at the +// time of static initialization. When it's false, the ServerBuilder plugin will +// be added using ServerBuilder::SetOption(). +class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { + public: + ServerBuilderPluginTest() {} + + void SetUp() GRPC_OVERRIDE { + port_ = grpc_pick_unused_port_or_die(); + builder_.reset(new ServerBuilder()); + } + + void InsertPlugin() { + if (GetParam()) { + // Add ServerBuilder plugin in static initialization + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + } else { + // Add ServerBuilder plugin using ServerBuilder::SetOption() + builder_->SetOption(std::unique_ptr<ServerBuilderOption>( + new InsertPluginServerBuilderOption())); + } + } + + void InsertPluginWithTestService() { + if (GetParam()) { + // Add ServerBuilder plugin in static initialization + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + auto plugin = static_cast<TestServerBuilderPlugin*>( + builder_->plugins_[PLUGIN_NAME].get()); + EXPECT_TRUE(plugin != nullptr); + plugin->SetRegisterService(); + } else { + // Add ServerBuilder plugin using ServerBuilder::SetOption() + std::unique_ptr<InsertPluginServerBuilderOption> option( + new InsertPluginServerBuilderOption()); + option->SetRegisterService(); + builder_->SetOption(std::move(option)); + } + } + + void StartServer() { + grpc::string server_address = "localhost:" + to_string(port_); + builder_->AddListeningPort(server_address, InsecureServerCredentials()); + server_ = builder_->BuildAndStart(); + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + } + + void ResetStub() { + string target = "dns:localhost:" + to_string(port_); + channel_ = CreateChannel(target, InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + } + + void TearDown() GRPC_OVERRIDE { + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + auto plugin = static_cast<TestServerBuilderPlugin*>( + builder_->plugins_[PLUGIN_NAME].get()); + EXPECT_TRUE(plugin != nullptr); + EXPECT_TRUE(plugin->init_server_is_called()); + EXPECT_TRUE(plugin->finish_is_called()); + } + + string to_string(const int number) { + std::stringstream strs; + strs << number; + return strs.str(); + } + + protected: + std::shared_ptr<Channel> channel_; + std::unique_ptr<ServerBuilder> builder_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + TestServiceImpl service_; + int port_; +}; + +TEST_P(ServerBuilderPluginTest, PluginWithoutServiceTest) { + InsertPlugin(); + StartServer(); +} + +TEST_P(ServerBuilderPluginTest, PluginWithServiceTest) { + InsertPluginWithTestService(); + StartServer(); + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + ClientContext context; + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +INSTANTIATE_TEST_CASE_P(ServerBuilderPluginTest, ServerBuilderPluginTest, + ::testing::Values(false, true)); + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 2f5dd6d49e..cbaee92228 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -135,6 +135,14 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, context->AddTrailingMetadata(ToString(iter->first), ToString(iter->second)); } + // Terminate rpc with error and debug info in trailer. + if (request->param().debug_info().stack_entries_size() || + !request->param().debug_info().detail().empty()) { + grpc::string serialized_debug_info = + request->param().debug_info().SerializeAsString(); + context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); + return Status::CANCELLED; + } } if (request->has_param() && (request->param().expected_client_identity().length() > 0 || diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 1ab6ced9e0..c89f88c900 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -47,6 +47,7 @@ namespace testing { const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; +const char* const kDebugInfoTrailerKey = "debug-info-bin"; typedef enum { DO_NOT_CANCEL = 0, diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index 92f93c869c..bf77878e0a 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -35,13 +35,13 @@ #include <string> #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" -#include "src/proto/grpc/lb/v0/load_balancer.pb.h" // C++ version +#include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version namespace grpc { namespace { -using grpc::lb::v0::LoadBalanceRequest; -using grpc::lb::v0::LoadBalanceResponse; +using grpc::lb::v1::LoadBalanceRequest; +using grpc::lb::v1::LoadBalanceResponse; class GrpclbTest : public ::testing::Test {}; @@ -60,9 +60,7 @@ TEST_F(GrpclbTest, CreateRequest) { TEST_F(GrpclbTest, ParseResponse) { LoadBalanceResponse response; - const std::string client_config_str = "I'm a client config"; auto* initial_response = response.mutable_initial_response(); - initial_response->set_client_config(client_config_str); auto* client_stats_report_interval = initial_response->mutable_client_stats_report_interval(); client_stats_report_interval->set_seconds(123); @@ -73,10 +71,7 @@ TEST_F(GrpclbTest, ParseResponse) { gpr_slice_from_copied_string(encoded_response.c_str()); grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice); EXPECT_TRUE(c_response->has_initial_response); - EXPECT_TRUE(c_response->initial_response.has_client_config); EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate); - EXPECT_TRUE(strcmp(c_response->initial_response.client_config, - client_config_str.c_str()) == 0); EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds, 123); EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos, diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 9af6a88044..7727824979 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "", DEFINE_string(service_account_key_file, "", "Path to service account json key file."); DEFINE_string(oauth_scope, "", "Scope for OAuth tokens."); +DEFINE_bool(do_not_abort_on_transient_failures, false, + "If set to 'true', abort() is not called in case of transient " + "failures (i.e failures that are temporary and will likely go away " + "on retrying; like a temporary connection failure) and an error " + "message is printed instead. Note that this flag just controls " + "whether abort() is called or not. It does not control whether the " + "test is retried in case of transient failures (and currently the " + "interop tests are not retried even if this flag is set to true)"); using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; @@ -89,8 +97,9 @@ int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); int ret = 0; - grpc::testing::InteropClient client( - CreateChannelForTestCase(FLAGS_test_case)); + grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), + true, + FLAGS_do_not_abort_on_transient_failures); if (FLAGS_test_case == "empty_unary") { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 22293d211f..cba52b111f 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -134,23 +134,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) { serviceStub_.Reset(channel); } -InteropClient::InteropClient(std::shared_ptr<Channel> channel) - : serviceStub_(channel, true) {} - InteropClient::InteropClient(std::shared_ptr<Channel> channel, - bool new_stub_every_test_case) - : serviceStub_(channel, new_stub_every_test_case) {} + bool new_stub_every_test_case, + bool do_not_abort_on_transient_failures) + : serviceStub_(channel, new_stub_every_test_case), + do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} -void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) { +bool InteropClient::AssertStatusOk(const Status& s) { if (s.ok()) { - return; + return true; } - gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(), - s.error_message().c_str()); - GPR_ASSERT(0); + + // Note: At this point, s.error_code is definitely not StatusCode::OK (we + // already checked for s.ok() above). So, the following will call abort() + // (unless s.error_code() corresponds to a transient failure and + // 'do_not_abort_on_transient_failures' is true) + return AssertStatusCode(s, StatusCode::OK); } -void InteropClient::DoEmpty() { +bool InteropClient::AssertStatusCode(const Status& s, + StatusCode expected_code) { + if (s.error_code() == expected_code) { + return true; + } + + gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s", + s.error_code(), expected_code, s.error_message().c_str()); + + // In case of transient transient/retryable failures (like a broken + // connection) we may or may not abort (see TransientFailureOrAbort()) + if (s.error_code() == grpc::StatusCode::UNAVAILABLE) { + return TransientFailureOrAbort(); + } + + abort(); +} + +bool InteropClient::DoEmpty() { gpr_log(GPR_DEBUG, "Sending an empty rpc..."); Empty request = Empty::default_instance(); @@ -158,17 +178,21 @@ void InteropClient::DoEmpty() { ClientContext context; Status s = serviceStub_.Get()->EmptyCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + + if (!AssertStatusOk(s)) { + return false; + } gpr_log(GPR_DEBUG, "Empty rpc done."); + return true; } -void InteropClient::PerformLargeUnary(SimpleRequest* request, +bool InteropClient::PerformLargeUnary(SimpleRequest* request, SimpleResponse* response) { - PerformLargeUnary(request, response, NoopChecks); + return PerformLargeUnary(request, response, NoopChecks); } -void InteropClient::PerformLargeUnary(SimpleRequest* request, +bool InteropClient::PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, CheckerFn custom_checks_fn) { ClientContext context; @@ -180,7 +204,9 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } custom_checks_fn(inspector, request, response); @@ -203,9 +229,11 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, default: GPR_ASSERT(false); } + + return true; } -void InteropClient::DoComputeEngineCreds( +bool InteropClient::DoComputeEngineCreds( const grpc::string& default_service_account, const grpc::string& oauth_scope) { gpr_log(GPR_DEBUG, @@ -215,7 +243,11 @@ void InteropClient::DoComputeEngineCreds( request.set_fill_username(true); request.set_fill_oauth_scope(true); request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + + if (!PerformLargeUnary(&request, &response)) { + return false; + } + gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str()); gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str()); GPR_ASSERT(!response.username().empty()); @@ -224,9 +256,10 @@ void InteropClient::DoComputeEngineCreds( const char* oauth_scope_str = response.oauth_scope().c_str(); GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); gpr_log(GPR_DEBUG, "Large unary with compute engine creds done."); + return true; } -void InteropClient::DoOauth2AuthToken(const grpc::string& username, +bool InteropClient::DoOauth2AuthToken(const grpc::string& username, const grpc::string& oauth_scope) { gpr_log(GPR_DEBUG, "Sending a unary rpc with raw oauth2 access token credentials ..."); @@ -239,16 +272,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username, Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(!response.oauth_scope().empty()); GPR_ASSERT(username == response.username()); const char* oauth_scope_str = response.oauth_scope().c_str(); GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done."); + return true; } -void InteropClient::DoPerRpcCreds(const grpc::string& json_key) { +bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) { gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ..."); SimpleRequest request; SimpleResponse response; @@ -263,35 +300,47 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) { Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos); gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done."); + return true; } -void InteropClient::DoJwtTokenCreds(const grpc::string& username) { +bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { gpr_log(GPR_DEBUG, "Sending a large unary rpc with JWT token credentials ..."); SimpleRequest request; SimpleResponse response; request.set_fill_username(true); request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + + if (!PerformLargeUnary(&request, &response)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(username.find(response.username()) != grpc::string::npos); gpr_log(GPR_DEBUG, "Large unary with JWT token creds done."); + return true; } -void InteropClient::DoLargeUnary() { +bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + if (!PerformLargeUnary(&request, &response)) { + return false; + } gpr_log(GPR_DEBUG, "Large unary done."); + return true; } -void InteropClient::DoLargeCompressedUnary() { +bool InteropClient::DoLargeCompressedUnary() { const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { @@ -307,14 +356,32 @@ void InteropClient::DoLargeCompressedUnary() { SimpleResponse response; request.set_response_type(payload_types[i]); request.set_response_compression(compression_types[j]); - PerformLargeUnary(&request, &response, CompressionChecks); + + if (!PerformLargeUnary(&request, &response, CompressionChecks)) { + gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); gpr_free(log_suffix); } } + + return true; +} + +// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return +// false +bool InteropClient::TransientFailureOrAbort() { + if (do_not_abort_on_transient_failures_) { + return false; + } + + abort(); } -void InteropClient::DoRequestStreaming() { +bool InteropClient::DoRequestStreaming() { gpr_log(GPR_DEBUG, "Sending request steaming rpc ..."); ClientContext context; @@ -328,19 +395,25 @@ void InteropClient::DoRequestStreaming() { for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); - GPR_ASSERT(stream->Write(request)); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed"); + return TransientFailureOrAbort(); + } aggregated_payload_size += request_stream_sizes[i]; } stream->WritesDone(); + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); - AssertOkOrPrintErrorStatus(s); - gpr_log(GPR_DEBUG, "Request streaming done."); + return true; } -void InteropClient::DoResponseStreaming() { - gpr_log(GPR_DEBUG, "Receiving response steaming rpc ..."); +bool InteropClient::DoResponseStreaming() { + gpr_log(GPR_DEBUG, "Receiving response streaming rpc ..."); ClientContext context; StreamingOutputCallRequest request; @@ -358,13 +431,27 @@ void InteropClient::DoResponseStreaming() { grpc::string(response_stream_sizes[i], '\0')); ++i; } - GPR_ASSERT(response_stream_sizes.size() == i); + + if (i < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This is + // most likely due to connection failure. + gpr_log(GPR_ERROR, + "DoResponseStreaming(): Read fewer streams (%d) than " + "response_stream_sizes.size() (%d)", + i, response_stream_sizes.size()); + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Response streaming done."); + return true; } -void InteropClient::DoResponseCompressedStreaming() { +bool InteropClient::DoResponseCompressedStreaming() { const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { @@ -378,7 +465,7 @@ void InteropClient::DoResponseCompressedStreaming() { CompressionType_Name(compression_types[j]).c_str(), PayloadType_Name(payload_types[i]).c_str()); - gpr_log(GPR_DEBUG, "Receiving response steaming rpc %s.", log_suffix); + gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); request.set_response_type(payload_types[i]); request.set_response_compression(compression_types[j]); @@ -432,18 +519,32 @@ void InteropClient::DoResponseCompressedStreaming() { ++k; } - GPR_ASSERT(response_stream_sizes.size() == k); - Status s = stream->Finish(); - - AssertOkOrPrintErrorStatus(s); gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); gpr_free(log_suffix); + + if (k < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, + "DoResponseCompressedStreaming(): Responses read (k=%d) is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%d)). (i=%d, j=%d)", + k, response_stream_sizes.size(), i, j); + return TransientFailureOrAbort(); + } + + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } } } + + return true; } -void InteropClient::DoResponseStreamingWithSlowConsumer() { - gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ..."); +bool InteropClient::DoResponseStreamingWithSlowConsumer() { + gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ..."); ClientContext context; StreamingOutputCallRequest request; @@ -464,14 +565,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() { usleep(kReceiveDelayMilliSeconds * 1000); ++i; } - GPR_ASSERT(kNumResponseMessages == i); + + if (i < kNumResponseMessages) { + gpr_log(GPR_ERROR, + "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is " + "less than the expected messages (i.e kNumResponseMessages = %d)", + i, kNumResponseMessages); + + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } - AssertOkOrPrintErrorStatus(s); gpr_log(GPR_DEBUG, "Response streaming done."); + return true; } -void InteropClient::DoHalfDuplex() { +bool InteropClient::DoHalfDuplex() { gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ..."); ClientContext context; @@ -483,7 +596,11 @@ void InteropClient::DoHalfDuplex() { ResponseParameters* response_parameter = request.add_response_parameters(); for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { response_parameter->set_size(response_stream_sizes[i]); - GPR_ASSERT(stream->Write(request)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i); + return TransientFailureOrAbort(); + } } stream->WritesDone(); @@ -494,13 +611,27 @@ void InteropClient::DoHalfDuplex() { grpc::string(response_stream_sizes[i], '\0')); ++i; } - GPR_ASSERT(response_stream_sizes.size() == i); + + if (i < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This is + // most likely due to a connection failure + gpr_log(GPR_ERROR, + "DoHalfDuplex(): Responses read (i=%d) are less than the expected " + "number of messages response_stream_sizes.size() (%d)", + i, response_stream_sizes.size()); + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done."); + return true; } -void InteropClient::DoPingPong() { +bool InteropClient::DoPingPong() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); ClientContext context; @@ -513,24 +644,40 @@ void InteropClient::DoPingPong() { ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; + for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { response_parameter->set_size(response_stream_sizes[i]); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); - GPR_ASSERT(stream->Write(request)); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i); + return TransientFailureOrAbort(); + } + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i); + return TransientFailureOrAbort(); + } + GPR_ASSERT(response.payload().body() == grpc::string(response_stream_sizes[i], '\0')); } stream->WritesDone(); + GPR_ASSERT(!stream->Read(&response)); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Ping pong streaming done."); + return true; } -void InteropClient::DoCancelAfterBegin() { - gpr_log(GPR_DEBUG, "Sending request steaming rpc ..."); +bool InteropClient::DoCancelAfterBegin() { + gpr_log(GPR_DEBUG, "Sending request streaming rpc ..."); ClientContext context; StreamingInputCallRequest request; @@ -542,11 +689,16 @@ void InteropClient::DoCancelAfterBegin() { gpr_log(GPR_DEBUG, "Trying to cancel..."); context.TryCancel(); Status s = stream->Finish(); - GPR_ASSERT(s.error_code() == StatusCode::CANCELLED); + + if (!AssertStatusCode(s, StatusCode::CANCELLED)) { + return false; + } + gpr_log(GPR_DEBUG, "Canceling streaming done."); + return true; } -void InteropClient::DoCancelAfterFirstResponse() { +bool InteropClient::DoCancelAfterFirstResponse() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); ClientContext context; @@ -560,17 +712,27 @@ void InteropClient::DoCancelAfterFirstResponse() { response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); StreamingOutputCallResponse response; - GPR_ASSERT(stream->Write(request)); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed"); + return TransientFailureOrAbort(); + } GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0')); + gpr_log(GPR_DEBUG, "Trying to cancel..."); context.TryCancel(); Status s = stream->Finish(); gpr_log(GPR_DEBUG, "Canceling pingpong streaming done."); + return true; } -void InteropClient::DoTimeoutOnSleepingServer() { +bool InteropClient::DoTimeoutOnSleepingServer() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc with a short deadline..."); @@ -587,11 +749,15 @@ void InteropClient::DoTimeoutOnSleepingServer() { stream->Write(request); Status s = stream->Finish(); - GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED); + if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) { + return false; + } + gpr_log(GPR_DEBUG, "Pingpong streaming timeout done."); + return true; } -void InteropClient::DoEmptyStream() { +bool InteropClient::DoEmptyStream() { gpr_log(GPR_DEBUG, "Starting empty_stream."); ClientContext context; @@ -601,12 +767,17 @@ void InteropClient::DoEmptyStream() { stream->WritesDone(); StreamingOutputCallResponse response; GPR_ASSERT(stream->Read(&response) == false); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "empty_stream done."); + return true; } -void InteropClient::DoStatusWithMessage() { +bool InteropClient::DoStatusWithMessage() { gpr_log(GPR_DEBUG, "Sending RPC with a request for status code 2 and message"); @@ -620,12 +791,16 @@ void InteropClient::DoStatusWithMessage() { Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN); + if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) { + return false; + } + GPR_ASSERT(s.error_message() == test_msg); gpr_log(GPR_DEBUG, "Done testing Status and Message"); + return true; } -void InteropClient::DoCustomMetadata() { +bool InteropClient::DoCustomMetadata() { const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial"); const grpc::string kInitialMetadataValue("test_initial_metadata_value"); const grpc::string kEchoTrailingBinMetadataKey( @@ -645,7 +820,10 @@ void InteropClient::DoCustomMetadata() { request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); @@ -675,14 +853,29 @@ void InteropClient::DoCustomMetadata() { grpc::string payload(kLargeRequestSize, '\0'); request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); StreamingOutputCallResponse response; - GPR_ASSERT(stream->Write(request)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + stream->WritesDone(); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed"); + return TransientFailureOrAbort(); + } + GPR_ASSERT(response.payload().body() == grpc::string(kLargeResponseSize, '\0')); + GPR_ASSERT(!stream->Read(&response)); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); @@ -695,6 +888,8 @@ void InteropClient::DoCustomMetadata() { gpr_log(GPR_DEBUG, "Done testing stream with custom metadata"); } + + return true; } } // namespace testing diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index a3794fd93f..ae75762bb8 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -51,41 +51,42 @@ using CheckerFn = class InteropClient { public: - explicit InteropClient(std::shared_ptr<Channel> channel); - explicit InteropClient( - std::shared_ptr<Channel> channel, - bool new_stub_every_test_case); // If new_stub_every_test_case is true, - // a new TestService::Stub object is - // created for every test case below + /// If new_stub_every_test_case is true, a new TestService::Stub object is + /// created for every test case + /// If do_not_abort_on_transient_failures is true, abort() is not called in + /// case of transient failures (like connection failures) + explicit InteropClient(std::shared_ptr<Channel> channel, + bool new_stub_every_test_case, + bool do_not_abort_on_transient_failures); ~InteropClient() {} void Reset(std::shared_ptr<Channel> channel); - void DoEmpty(); - void DoLargeUnary(); - void DoLargeCompressedUnary(); - void DoPingPong(); - void DoHalfDuplex(); - void DoRequestStreaming(); - void DoResponseStreaming(); - void DoResponseCompressedStreaming(); - void DoResponseStreamingWithSlowConsumer(); - void DoCancelAfterBegin(); - void DoCancelAfterFirstResponse(); - void DoTimeoutOnSleepingServer(); - void DoEmptyStream(); - void DoStatusWithMessage(); - void DoCustomMetadata(); + bool DoEmpty(); + bool DoLargeUnary(); + bool DoLargeCompressedUnary(); + bool DoPingPong(); + bool DoHalfDuplex(); + bool DoRequestStreaming(); + bool DoResponseStreaming(); + bool DoResponseCompressedStreaming(); + bool DoResponseStreamingWithSlowConsumer(); + bool DoCancelAfterBegin(); + bool DoCancelAfterFirstResponse(); + bool DoTimeoutOnSleepingServer(); + bool DoEmptyStream(); + bool DoStatusWithMessage(); + bool DoCustomMetadata(); // Auth tests. // username is a string containing the user email - void DoJwtTokenCreds(const grpc::string& username); - void DoComputeEngineCreds(const grpc::string& default_service_account, + bool DoJwtTokenCreds(const grpc::string& username); + bool DoComputeEngineCreds(const grpc::string& default_service_account, const grpc::string& oauth_scope); // username the GCE default service account email - void DoOauth2AuthToken(const grpc::string& username, + bool DoOauth2AuthToken(const grpc::string& username, const grpc::string& oauth_scope); // username is a string containing the user email - void DoPerRpcCreds(const grpc::string& json_key); + bool DoPerRpcCreds(const grpc::string& json_key); private: class ServiceStub { @@ -105,13 +106,18 @@ class InteropClient { // Get() call }; - void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); + bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); /// Run \a custom_check_fn as an additional check. - void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, + bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, CheckerFn custom_checks_fn); - void AssertOkOrPrintErrorStatus(const Status& s); + bool AssertStatusOk(const Status& s); + bool AssertStatusCode(const Status& s, StatusCode expected_code); + bool TransientFailureOrAbort(); ServiceStub serviceStub_; + + /// If true, abort() is not called for transient failures + bool do_not_abort_on_transient_failures_; }; } // namespace testing diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index f287a5aa3b..aa95682e74 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient( int test_id, const grpc::string& server_address, std::shared_ptr<Channel> channel, const WeightedRandomTestSelector& test_selector, long test_duration_secs, - long sleep_duration_ms) + long sleep_duration_ms, bool do_not_abort_on_transient_failures) : test_id_(test_id), server_address_(server_address), channel_(channel), - interop_client_(new InteropClient(channel, false)), + interop_client_(new InteropClient(channel, false, + do_not_abort_on_transient_failures)), test_selector_(test_selector), test_duration_secs_(test_duration_secs), sleep_duration_ms_(sleep_duration_ms) {} @@ -126,31 +127,67 @@ void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) { } } -// TODO(sree): Add all interop tests -void StressTestInteropClient::RunTest(TestCaseType test_case) { +bool StressTestInteropClient::RunTest(TestCaseType test_case) { + bool is_success = false; switch (test_case) { case EMPTY_UNARY: { - interop_client_->DoEmpty(); + is_success = interop_client_->DoEmpty(); break; } case LARGE_UNARY: { - interop_client_->DoLargeUnary(); + is_success = interop_client_->DoLargeUnary(); break; } case LARGE_COMPRESSED_UNARY: { - interop_client_->DoLargeCompressedUnary(); + is_success = interop_client_->DoLargeCompressedUnary(); break; } case CLIENT_STREAMING: { - interop_client_->DoRequestStreaming(); + is_success = interop_client_->DoRequestStreaming(); break; } case SERVER_STREAMING: { - interop_client_->DoResponseStreaming(); + is_success = interop_client_->DoResponseStreaming(); + break; + } + case SERVER_COMPRESSED_STREAMING: { + is_success = interop_client_->DoResponseCompressedStreaming(); + break; + } + case SLOW_CONSUMER: { + is_success = interop_client_->DoResponseStreamingWithSlowConsumer(); + break; + } + case HALF_DUPLEX: { + is_success = interop_client_->DoHalfDuplex(); + break; + } + case PING_PONG: { + is_success = interop_client_->DoPingPong(); + break; + } + case CANCEL_AFTER_BEGIN: { + is_success = interop_client_->DoCancelAfterBegin(); + break; + } + case CANCEL_AFTER_FIRST_RESPONSE: { + is_success = interop_client_->DoCancelAfterFirstResponse(); + break; + } + case TIMEOUT_ON_SLEEPING_SERVER: { + is_success = interop_client_->DoTimeoutOnSleepingServer(); break; } case EMPTY_STREAM: { - interop_client_->DoEmptyStream(); + is_success = interop_client_->DoEmptyStream(); + break; + } + case STATUS_CODE_AND_MESSAGE: { + is_success = interop_client_->DoStatusWithMessage(); + break; + } + case CUSTOM_METADATA: { + is_success = interop_client_->DoCustomMetadata(); break; } default: { @@ -159,6 +196,8 @@ void StressTestInteropClient::RunTest(TestCaseType test_case) { break; } } + + return is_success; } } // namespace testing diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index cb0cd98821..aa93b58b4a 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -49,7 +49,6 @@ namespace testing { using std::pair; using std::vector; -// TODO(sreek): Add more test cases here in future enum TestCaseType { UNKNOWN_TEST = -1, EMPTY_UNARY = 0, @@ -57,7 +56,16 @@ enum TestCaseType { LARGE_COMPRESSED_UNARY = 2, CLIENT_STREAMING = 3, SERVER_STREAMING = 4, - EMPTY_STREAM = 5 + SERVER_COMPRESSED_STREAMING = 5, + SLOW_CONSUMER = 6, + HALF_DUPLEX = 7, + PING_PONG = 8, + CANCEL_AFTER_BEGIN = 9, + CANCEL_AFTER_FIRST_RESPONSE = 10, + TIMEOUT_ON_SLEEPING_SERVER = 11, + EMPTY_STREAM = 12, + STATUS_CODE_AND_MESSAGE = 13, + CUSTOM_METADATA = 14 }; const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { @@ -66,7 +74,16 @@ const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, - {EMPTY_STREAM, "empty_stream"}}; + {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, + {SLOW_CONSUMER, "slow_consumer"}, + {HALF_DUPLEX, "half_duplex"}, + {PING_PONG, "ping_pong"}, + {CANCEL_AFTER_BEGIN, "cancel_after_begin"}, + {CANCEL_AFTER_FIRST_RESPONSE, "cancel_after_first_response"}, + {TIMEOUT_ON_SLEEPING_SERVER, "timeout_on_sleeping_server"}, + {EMPTY_STREAM, "empty_stream"}, + {STATUS_CODE_AND_MESSAGE, "status_code_and_message"}, + {CUSTOM_METADATA, "custom_metadata"}}; class WeightedRandomTestSelector { public: @@ -87,14 +104,15 @@ class StressTestInteropClient { StressTestInteropClient(int test_id, const grpc::string& server_address, std::shared_ptr<Channel> channel, const WeightedRandomTestSelector& test_selector, - long test_duration_secs, long sleep_duration_ms); + long test_duration_secs, long sleep_duration_ms, + bool do_not_abort_on_transient_failures); // The main function. Use this as the thread entry point. // qps_gauge is the QpsGauge to record the requests per second metric void MainLoop(std::shared_ptr<QpsGauge> qps_gauge); private: - void RunTest(TestCaseType test_case); + bool RunTest(TestCaseType test_case); int test_id_; const grpc::string& server_address_; diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index d9e3fd25c5..7787931900 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -89,7 +89,16 @@ DEFINE_string(test_cases, "", " large_compressed_unary\n" " client_streaming\n" " server_streaming\n" + " server_compressed_streaming\n" + " slow_consumer\n" + " half_duplex\n" + " ping_pong\n" + " cancel_after_begin\n" + " cancel_after_first_response\n" + " timeout_on_sleeping_server\n" " empty_stream\n" + " status_code_and_message\n" + " custom_metadata\n" " Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n" " The above will execute 'empty_unary', 20% of the time," " 'large_unary', 10% of the time and 'empty_stream' the remaining" @@ -101,6 +110,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO, "The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 " "(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)"); +DEFINE_bool(do_not_abort_on_transient_failures, true, + "If set to 'true', abort() is not called in case of transient " + "failures like temporary connection failures."); + using grpc::testing::kTestCaseList; using grpc::testing::MetricsService; using grpc::testing::MetricsServiceImpl; @@ -189,6 +202,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses, gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str()); gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms); gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs); + gpr_log(GPR_INFO, "num_channels_per_server: %d", + FLAGS_num_channels_per_server); + gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel); + gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level); + gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s", + FLAGS_do_not_abort_on_transient_failures ? "true" : "false"); int num = 0; for (auto it = addresses.begin(); it != addresses.end(); it++) { @@ -272,7 +291,7 @@ int main(int argc, char** argv) { stub_idx++) { StressTestInteropClient* client = new StressTestInteropClient( ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, - FLAGS_sleep_duration_ms); + FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures); bool is_already_created = false; // QpsGauge name diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index e72cef2811..c32160a7d4 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -84,7 +84,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + CompletionQueue*)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -165,7 +166,8 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { AsyncClient(const ClientConfig& config, std::function<ClientRpcContext*( StubType*, std::function<gpr_timespec()> next_issue, - const RequestType&)> setup_ctx, + const RequestType&)> + setup_ctx, std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) : ClientImpl<StubType, RequestType>(config, create_stub), @@ -278,7 +280,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + void*)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -405,7 +408,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> start_req, + const grpc::string& method_name, CompletionQueue*, void*)> + start_req, std::function<void(grpc::Status, ByteBuffer*)> on_done) : context_(), stub_(stub), diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 9d6bf2ab73..6b3329b046 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -43,12 +43,16 @@ sys.path.append(run_tests_root) import performance.scenario_config as scenario_config +def _scenario_json_string(scenario_json): + return json.dumps(scenario_config.remove_nonproto_fields(scenario_json)) + print yaml.dump({ 'tests': [ { 'name': 'json_run_localhost', - 'shortname': 'json_run_localhost:%s' % js['name'], - 'args': ['--scenario_json', pipes.quote(json.dumps(js))], + 'shortname': 'json_run_localhost:%s' % scenario_json['name'], + 'args': ['--scenario_json', + pipes.quote(_scenario_json_string(scenario_json))], 'ci_platforms': ['linux', 'mac', 'posix', 'windows'], 'platforms': ['linux', 'mac', 'posix', 'windows'], 'flaky': False, @@ -58,6 +62,6 @@ print yaml.dump({ 'cpu_cost': 1000.0, 'exclude_configs': [] } - for js in scenario_config.CXXLanguage().scenarios() + for scenario_json in scenario_config.CXXLanguage().scenarios() ] }) diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index a68f1ae7b6..1234542687 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -73,7 +73,8 @@ class AsyncQpsServerTest : public Server { CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function<grpc::Status(const PayloadConfig &, const RequestType *, - ResponseType *)> process_rpc) + ResponseType *)> + process_rpc) : Server(config) { char *server_address = NULL; @@ -190,7 +191,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, - void *)> request_method, + void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), |