diff options
Diffstat (limited to 'test/cpp')
21 files changed, 612 insertions, 410 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 8229bda6bf..6c7eae53a4 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -31,6 +31,7 @@ * */ +#include <cinttypes> #include <memory> #include <thread> @@ -207,12 +208,11 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { public: void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} - void UpdatePlugins( - std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) GRPC_OVERRIDE { auto plugin = plugins->begin(); while (plugin != plugins->end()) { - if ((*plugin).second->has_sync_methods()) { + if ((*plugin)->has_sync_methods()) { plugins->erase(plugin++); } else { plugin++; @@ -235,8 +235,11 @@ class TestScenario { disable_blocking, credentials_type.c_str(), message_content.size()); } bool disable_blocking; - const grpc::string credentials_type; - const grpc::string message_content; + // Although the below grpc::string's are logically const, we can't declare + // them const because of a limitation in the way old compilers (e.g., gcc-4.4) + // manage vector insertion using a copy constructor + grpc::string credentials_type; + grpc::string message_content; }; class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { @@ -940,7 +943,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client sends 3 messages (tags 3, 4 and 5) for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { - send_request.set_message("Ping " + std::to_string(tag_idx)); + send_request.set_message("Ping " + grpc::to_string(tag_idx)); cli_stream->Write(send_request, tag(tag_idx)); Verifier(GetParam().disable_blocking) .Expect(tag_idx, true) @@ -1106,7 +1109,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server sends three messages (tags 3, 4 and 5) // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { - send_response.set_message("Pong " + std::to_string(tag_idx)); + send_response.set_message("Pong " + grpc::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); // Note that we'll add something to the verifier and verify that // something was seen, but it might be tag 11 and not what we @@ -1397,9 +1400,9 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking, for (auto cred = credentials_types.begin(); cred != credentials_types.end(); ++cred) { for (auto msg = messages.begin(); msg != messages.end(); msg++) { - scenarios.push_back(TestScenario(false, *cred, *msg)); + scenarios.emplace_back(false, *cred, *msg); if (test_disable_blocking) { - scenarios.push_back(TestScenario(true, *cred, *msg)); + scenarios.emplace_back(true, *cred, *msg); } } } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index e3667cf26b..8de9d339f6 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -199,7 +199,10 @@ class TestScenario { credentials_type.c_str()); } bool use_proxy; - const grpc::string credentials_type; + // Although the below grpc::string is logically const, we can't declare + // them const because of a limitation in the way old compilers (e.g., gcc-4.4) + // manage vector insertion using a copy constructor + grpc::string credentials_type; }; class End2endTest : public ::testing::TestWithParam<TestScenario> { @@ -329,7 +332,7 @@ class End2endServerTryCancelTest : public End2endTest { // Send server_try_cancel value in the client metadata context.AddMetadata(kServerTryCancelRequest, - std::to_string(server_try_cancel)); + grpc::to_string(server_try_cancel)); auto stream = stub_->RequestStream(&context, &response); @@ -402,7 +405,7 @@ class End2endServerTryCancelTest : public End2endTest { // Send server_try_cancel in the client metadata context.AddMetadata(kServerTryCancelRequest, - std::to_string(server_try_cancel)); + grpc::to_string(server_try_cancel)); request.set_message("hello"); auto stream = stub_->ResponseStream(&context, request); @@ -413,7 +416,7 @@ class End2endServerTryCancelTest : public End2endTest { break; } EXPECT_EQ(response.message(), - request.message() + std::to_string(num_msgs_read)); + request.message() + grpc::to_string(num_msgs_read)); num_msgs_read++; } gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); @@ -479,14 +482,14 @@ class End2endServerTryCancelTest : public End2endTest { // Send server_try_cancel in the client metadata context.AddMetadata(kServerTryCancelRequest, - std::to_string(server_try_cancel)); + grpc::to_string(server_try_cancel)); auto stream = stub_->BidiStream(&context); int num_msgs_read = 0; int num_msgs_sent = 0; while (num_msgs_sent < num_messages) { - request.set_message("hello " + std::to_string(num_msgs_sent)); + request.set_message("hello " + grpc::to_string(num_msgs_sent)); if (!stream->Write(request)) { break; } @@ -548,7 +551,7 @@ TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { ClientContext context; context.AddMetadata(kServerTryCancelRequest, - std::to_string(CANCEL_BEFORE_PROCESSING)); + grpc::to_string(CANCEL_BEFORE_PROCESSING)); Status s = stub_->Echo(&context, request, &response); EXPECT_FALSE(s.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); @@ -1431,9 +1434,9 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy, } for (auto it = credentials_types.begin(); it != credentials_types.end(); ++it) { - scenarios.push_back(TestScenario(false, *it)); + scenarios.emplace_back(false, *it); if (use_proxy) { - scenarios.push_back(TestScenario(true, *it)); + scenarios.emplace_back(true, *it); } } return scenarios; diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 2c05db345b..7e0c0e8a7c 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -347,47 +347,50 @@ class HybridEnd2endTest : public ::testing::Test { } grpc::testing::UnimplementedService::Service unimplemented_service_; - std::vector<std::unique_ptr<ServerCompletionQueue> > cqs_; + std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<Server> server_; std::ostringstream server_address_; }; TEST_F(HybridEnd2endTest, AsyncEcho) { - EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> service; + typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType; + SType service; SetUpServer(&service, nullptr, nullptr); ResetStub(); - std::thread echo_handler_thread( - [this, &service] { HandleEcho(&service, cqs_[0].get(), false); }); + std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(), + false); TestAllMethods(); echo_handler_thread.join(); } TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>> + SType; + SType service; SetUpServer(&service, nullptr, nullptr); ResetStub(); - std::thread echo_handler_thread( - [this, &service] { HandleEcho(&service, cqs_[0].get(), false); }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(), + false); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); TestAllMethods(); echo_handler_thread.join(); request_stream_handler_thread.join(); } TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; SetUpServer(&service, nullptr, nullptr); ResetStub(); - std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); TestAllMethods(); response_stream_handler_thread.join(); request_stream_handler_thread.join(); @@ -395,16 +398,17 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) { // Add a second service with one sync method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; TestServiceImplDupPkg dup_service; SetUpServer(&service, &dup_service, nullptr); ResetStub(); - std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); TestAllMethods(); SendEchoToDupService(); response_stream_handler_thread.join(); @@ -413,18 +417,20 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { // Add a second service with one async method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; duplicate::EchoTestService::AsyncService dup_service; SetUpServer(&service, &dup_service, nullptr); ResetStub(); - std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); std::thread echo_handler_thread( - [this, &dup_service] { HandleEcho(&dup_service, cqs_[2].get(), true); }); + HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service, + cqs_[2].get(), true); TestAllMethods(); SendEchoToDupService(); response_stream_handler_thread.join(); @@ -437,25 +443,24 @@ TEST_F(HybridEnd2endTest, GenericEcho) { AsyncGenericService generic_service; SetUpServer(&service, nullptr, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); TestAllMethods(); generic_handler_thread.join(); } TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithGenericMethod_Echo<TestServiceImpl>> + SType; + SType service; AsyncGenericService generic_service; SetUpServer(&service, nullptr, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); TestAllMethods(); generic_handler_thread.join(); request_stream_handler_thread.join(); @@ -463,18 +468,18 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) { // Add a second service with one sync method. TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithGenericMethod_Echo<TestServiceImpl>> + SType; + SType service; AsyncGenericService generic_service; TestServiceImplDupPkg dup_service; SetUpServer(&service, &dup_service, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); TestAllMethods(); SendEchoToDupService(); generic_handler_thread.join(); @@ -483,20 +488,21 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) { // Add a second service with one async method. TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) { - EchoTestService::WithAsyncMethod_RequestStream< - EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > - service; + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithGenericMethod_Echo<TestServiceImpl>> + SType; + SType service; AsyncGenericService generic_service; duplicate::EchoTestService::AsyncService dup_service; SetUpServer(&service, &dup_service, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); std::thread echo_handler_thread( - [this, &dup_service] { HandleEcho(&dup_service, cqs_[2].get(), true); }); + HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service, + cqs_[2].get(), true); TestAllMethods(); SendEchoToDupService(); generic_handler_thread.join(); @@ -505,20 +511,20 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) { } TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) { - EchoTestService::WithAsyncMethod_RequestStream< + typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithGenericMethod_Echo< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > > - service; + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>> + SType; + SType service; AsyncGenericService generic_service; SetUpServer(&service, nullptr, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); - std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); - std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[2].get()); }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[2].get()); TestAllMethods(); generic_handler_thread.join(); request_stream_handler_thread.join(); @@ -526,21 +532,20 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) { } TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) { - EchoTestService::WithGenericMethod_RequestStream< + typedef EchoTestService::WithGenericMethod_RequestStream< EchoTestService::WithGenericMethod_Echo< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > > - service; + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>> + SType; + SType service; AsyncGenericService generic_service; SetUpServer(&service, nullptr, &generic_service); ResetStub(); - std::thread generic_handler_thread([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[0].get()); - }); - std::thread generic_handler_thread2([this, &generic_service] { - HandleGenericCall(&generic_service, cqs_[1].get()); - }); - std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[2].get()); }); + std::thread generic_handler_thread(HandleGenericCall, &generic_service, + cqs_[0].get()); + std::thread generic_handler_thread2(HandleGenericCall, &generic_service, + cqs_[1].get()); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[2].get()); TestAllMethods(); generic_handler_thread.join(); generic_handler_thread2.join(); @@ -552,7 +557,7 @@ TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) { TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) { EchoTestService::WithGenericMethod_RequestStream< EchoTestService::WithGenericMethod_Echo< - EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > > + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>> service; SetUpServer(&service, nullptr, nullptr); EXPECT_EQ(nullptr, server_.get()); diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc index 1c1095087a..75f23b64a7 100644 --- a/test/cpp/end2end/server_builder_plugin_test.cc +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -113,15 +113,14 @@ class InsertPluginServerBuilderOption : public ServerBuilderOption { void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} - void UpdatePlugins( - std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) GRPC_OVERRIDE { plugins->clear(); std::unique_ptr<TestServerBuilderPlugin> plugin( new TestServerBuilderPlugin()); if (register_service_) plugin->SetRegisterService(); - (*plugins)[plugin->name()] = std::move(plugin); + plugins->emplace_back(std::move(plugin)); } void SetRegisterService() { register_service_ = true; } @@ -162,7 +161,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { void InsertPlugin() { if (GetParam()) { // Add ServerBuilder plugin in static initialization - EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + CheckPresent(); } else { // Add ServerBuilder plugin using ServerBuilder::SetOption() builder_->SetOption(std::unique_ptr<ServerBuilderOption>( @@ -173,10 +172,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { void InsertPluginWithTestService() { if (GetParam()) { // Add ServerBuilder plugin in static initialization - EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); - auto plugin = static_cast<TestServerBuilderPlugin*>( - builder_->plugins_[PLUGIN_NAME].get()); - EXPECT_TRUE(plugin != nullptr); + auto plugin = CheckPresent(); + EXPECT_TRUE(plugin); plugin->SetRegisterService(); } else { // Add ServerBuilder plugin using ServerBuilder::SetOption() @@ -192,7 +189,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { builder_->AddListeningPort(server_address, InsecureServerCredentials()); cq_ = builder_->AddCompletionQueue(); server_ = builder_->BuildAndStart(); - EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + EXPECT_TRUE(CheckPresent()); } void ResetStub() { @@ -202,10 +199,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { } void TearDown() GRPC_OVERRIDE { - EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); - auto plugin = static_cast<TestServerBuilderPlugin*>( - builder_->plugins_[PLUGIN_NAME].get()); - EXPECT_TRUE(plugin != nullptr); + auto plugin = CheckPresent(); + EXPECT_TRUE(plugin); EXPECT_TRUE(plugin->init_server_is_called()); EXPECT_TRUE(plugin->finish_is_called()); server_->Shutdown(); @@ -230,6 +225,19 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { std::unique_ptr<Server> server_; TestServiceImpl service_; int port_; + + private: + TestServerBuilderPlugin* CheckPresent() { + auto it = builder_->plugins_.begin(); + for (; it != builder_->plugins_.end(); it++) { + if ((*it)->name() == PLUGIN_NAME) break; + } + if (it != builder_->plugins_.end()) { + return static_cast<TestServerBuilderPlugin*>(it->get()); + } else { + return nullptr; + } + } }; TEST_P(ServerBuilderPluginTest, PluginWithoutServiceTest) { diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index cbaee92228..52abd80d69 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -33,6 +33,7 @@ #include "test/cpp/end2end/test_service_impl.h" +#include <string> #include <thread> #include <grpc++/security/credentials.h> @@ -253,7 +254,7 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, } for (int i = 0; i < kNumResponseStreamsMsgs; i++) { - response.set_message(request->message() + std::to_string(i)); + response.set_message(request->message() + grpc::to_string(i)); writer->Write(response); } diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 94541f9a45..b021b34523 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -230,7 +230,7 @@ class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> { }; class CommonStressTestAsyncServer - : public CommonStressTest<::grpc::testing::EchoTestService::AsyncService> { + : public CommonStressTest<grpc::testing::EchoTestService::AsyncService> { public: void SetUp() GRPC_OVERRIDE { shutting_down_ = false; @@ -394,7 +394,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { for (int i = 0; i < num_rpcs; ++i) { AsyncClientCall* call = new AsyncClientCall; EchoRequest request; - request.set_message("Hello: " + std::to_string(i)); + request.set_message("Hello: " + grpc::to_string(i)); call->response_reader = common_.GetStub()->AsyncEcho(&call->context, request, &cq_); call->response_reader->Finish(&call->response, &call->status, diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index addaf174f2..e8ae6ee572 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -40,7 +40,9 @@ #include <grpc++/client_context.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include "src/core/lib/support/string.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" #include "test/cpp/util/test_config.h" @@ -52,30 +54,31 @@ DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); DEFINE_string(test_case, "large_unary", - "Configure different test cases. Valid options are: " - "empty_unary : empty (zero bytes) request and response; " - "large_unary : single request and (large) response; " - "large_compressed_unary : single request and compressed (large) " - "response; " - "client_streaming : request streaming with single response; " - "server_streaming : single request with response streaming; " + "Configure different test cases. Valid options are:\n\n" + "all : all test cases;\n" + "cancel_after_begin : cancel stream after starting it;\n" + "cancel_after_first_response: cancel on first response;\n" + "client_compressed_streaming : compressed request streaming with " + "client_compressed_unary : single compressed request;\n" + "client_streaming : request streaming with single response;\n" + "compute_engine_creds: large_unary with compute engine auth;\n" + "custom_metadata: server will echo custom metadata;\n" + "empty_stream : bi-di stream with no request/response;\n" + "empty_unary : empty (zero bytes) request and response;\n" + "half_duplex : half-duplex streaming;\n" + "jwt_token_creds: large_unary with JWT token auth;\n" + "large_unary : single request and (large) response;\n" + "oauth2_auth_token: raw oauth2 access token auth;\n" + "per_rpc_creds: raw oauth2 access token on a single rpc;\n" + "ping_pong : full-duplex streaming;\n" + "response streaming;\n" "server_compressed_streaming : single request with compressed " - "response streaming; " - "slow_consumer : single request with response; " - " streaming with slow client consumer; " - "half_duplex : half-duplex streaming; " - "ping_pong : full-duplex streaming; " - "cancel_after_begin : cancel stream after starting it; " - "cancel_after_first_response: cancel on first response; " - "timeout_on_sleeping_server: deadline exceeds on stream; " - "empty_stream : bi-di stream with no request/response; " - "compute_engine_creds: large_unary with compute engine auth; " - "jwt_token_creds: large_unary with JWT token auth; " - "oauth2_auth_token: raw oauth2 access token auth; " - "per_rpc_creds: raw oauth2 access token on a single rpc; " - "status_code_and_message: verify status code & message; " - "custom_metadata: server will echo custom metadata;" - "all : all of above."); + "server_compressed_unary : single compressed response;\n" + "server_streaming : single request with response streaming;\n" + "slow_consumer : single request with response streaming with " + "slow client consumer;\n" + "status_code_and_message: verify status code & message;\n" + "timeout_on_sleeping_server: deadline exceeds on stream;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -104,14 +107,18 @@ int main(int argc, char** argv) { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { client.DoLargeUnary(); - } else if (FLAGS_test_case == "large_compressed_unary") { - client.DoLargeCompressedUnary(); + } else if (FLAGS_test_case == "server_compressed_unary") { + client.DoServerCompressedUnary(); + } else if (FLAGS_test_case == "client_compressed_unary") { + client.DoClientCompressedUnary(); } else if (FLAGS_test_case == "client_streaming") { client.DoRequestStreaming(); } else if (FLAGS_test_case == "server_streaming") { client.DoResponseStreaming(); } else if (FLAGS_test_case == "server_compressed_streaming") { - client.DoResponseCompressedStreaming(); + client.DoServerCompressedStreaming(); + } else if (FLAGS_test_case == "client_compressed_streaming") { + client.DoClientCompressedStreaming(); } else if (FLAGS_test_case == "slow_consumer") { client.DoResponseStreamingWithSlowConsumer(); } else if (FLAGS_test_case == "half_duplex") { @@ -144,9 +151,12 @@ int main(int argc, char** argv) { } else if (FLAGS_test_case == "all") { client.DoEmpty(); client.DoLargeUnary(); + client.DoClientCompressedUnary(); + client.DoServerCompressedUnary(); client.DoRequestStreaming(); client.DoResponseStreaming(); - client.DoResponseCompressedStreaming(); + client.DoClientCompressedStreaming(); + client.DoServerCompressedStreaming(); client.DoHalfDuplex(); client.DoPingPong(); client.DoCancelAfterBegin(); @@ -165,15 +175,35 @@ int main(int argc, char** argv) { } // compute_engine_creds only runs in GCE. } else { - gpr_log( - GPR_ERROR, - "Unsupported test case %s. Valid options are all|empty_unary|" - "large_unary|large_compressed_unary|client_streaming|server_streaming|" - "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|" - "cancel_after_first_response|timeout_on_sleeping_server|empty_stream|" - "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds|" - "status_code_and_message|custom_metadata", - FLAGS_test_case.c_str()); + const char* testcases[] = {"all", + "cancel_after_begin", + "cancel_after_first_response", + "client_compressed_streaming", + "client_compressed_unary", + "client_streaming", + "compute_engine_creds", + "custom_metadata", + "empty_stream", + "empty_unary", + "half_duplex", + "jwt_token_creds", + "large_unary", + "oauth2_auth_token", + "oauth2_auth_token", + "per_rpc_creds", + "per_rpc_creds", + "ping_pong", + "server_compressed_streaming", + "server_compressed_unary", + "server_streaming", + "status_code_and_message", + "timeout_on_sleeping_server"}; + char* joined_testcases = + gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); + + gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s", + FLAGS_test_case.c_str(), joined_testcases); + gpr_free(joined_testcases); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 0bf1fd6f73..89f841dbe9 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,10 +31,8 @@ * */ -#include "test/cpp/interop/interop_client.h" - #include <unistd.h> - +#include <cinttypes> #include <fstream> #include <memory> @@ -51,6 +49,7 @@ #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/client_helper.h" +#include "test/cpp/interop/interop_client.h" namespace grpc { namespace testing { @@ -58,7 +57,7 @@ namespace testing { namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -68,28 +67,23 @@ const int kLargeResponseSize = 314159; void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} -void CompressionChecks(const InteropClientContextInspector& inspector, - const SimpleRequest* request, - const SimpleResponse* response) { +void UnaryCompressionChecks(const InteropClientContextInspector& inspector, + const SimpleRequest* request, + const SimpleResponse* response) { const grpc_compression_algorithm received_compression = inspector.GetCallCompressionAlgorithm(); - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { + if (request->response_compressed().value()) { + if (received_compression == GRPC_COMPRESS_NONE) { // Requested some compression, got NONE. This is an error. gpr_log(GPR_ERROR, "Failure: Requested compression but got uncompressed response " "from server."); abort(); } - } - if (!request->request_compressed_response()) { - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request->response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should always - // be compressed. GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // Didn't request compression -> make sure the response is uncompressed + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } } } // namespace @@ -191,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, CheckerFn custom_checks_fn) { ClientContext context; InteropClientContextInspector inspector(context); - // If the request doesn't already specify the response type, default to - // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + if (request->has_expect_compressed()) { + if (request->expect_compressed().value()) { + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + } else { + context.set_compression_algorithm(GRPC_COMPRESS_NONE); + } + } Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); if (!AssertStatusOk(s)) { @@ -205,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - GPR_ASSERT(response->payload().type() == request->response_type()); - switch (response->payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); - break; - case PayloadType::UNCOMPRESSABLE: { - // We don't really check anything: We can't assert that the payload is - // uncompressed because it's the server's prerogative to decide on that, - // and different implementations decide differently (ie, Java always - // compresses when requested to do so, whereas C core throws away the - // compressed payload if the output is larger than the input). - // In addition, we don't compare the actual random bytes received because - // asserting that data is sent/received properly isn't the purpose of this - // test. Moreover, different implementations are also free to use - // different sets of random bytes. - } break; - default: - GPR_ASSERT(false); - } - + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); return true; } @@ -238,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -312,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -328,7 +306,6 @@ bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; } @@ -336,32 +313,73 @@ bool InteropClient::DoLargeUnary() { return true; } -bool InteropClient::DoLargeCompressedUnary() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", - log_suffix); - SimpleRequest request; - SimpleResponse response; - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - if (!PerformLargeUnary(&request, &response, CompressionChecks)) { - gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); - gpr_free(log_suffix); - return false; - } - - gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); +bool InteropClient::DoClientCompressedUnary() { + // Probing for compression-checks support. + ClientContext probe_context; + SimpleRequest probe_req; + SimpleResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + + probe_req.set_response_size(kLargeResponseSize); + probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); + const Status s = + serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); + + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_expect_compressed()->set_value(compressions[i]); + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + + gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + } + + return true; +} + +bool InteropClient::DoServerCompressedUnary() { + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", + log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_response_compressed()->set_value(compressions[i]); + + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); gpr_free(log_suffix); + return false; } + + gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); + gpr_free(log_suffix); } return true; @@ -388,7 +406,7 @@ bool InteropClient::DoRequestStreaming() { serviceStub_.Get()->StreamingInputCall(&context, &response)); int aggregated_payload_size = 0; - for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + for (size_t i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); if (!stream->Write(request)) { @@ -397,7 +415,7 @@ bool InteropClient::DoRequestStreaming() { } aggregated_payload_size += request_stream_sizes[i]; } - stream->WritesDone(); + GPR_ASSERT(stream->WritesDone()); Status s = stream->Finish(); if (!AssertStatusOk(s)) { @@ -447,92 +465,129 @@ bool InteropClient::DoResponseStreaming() { return true; } -bool InteropClient::DoResponseCompressedStreaming() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - ClientContext context; - InteropClientContextInspector inspector(context); - StreamingOutputCallRequest request; - - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); - - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - for (size_t k = 0; k < response_stream_sizes.size(); ++k) { - ResponseParameters* response_parameter = - request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[k]); - } - StreamingOutputCallResponse response; - - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - serviceStub_.Get()->StreamingOutputCall(&context, request)); - - size_t k = 0; - while (stream->Read(&response)) { - // Payload related checks. - GPR_ASSERT(response.payload().type() == request.response_type()); - switch (response.payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[k], '\0')); - break; - case PayloadType::UNCOMPRESSABLE: - break; - default: - GPR_ASSERT(false); - } - - // Compression related checks. - if (request.request_compressed_response()) { - GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > - GRPC_COMPRESS_NONE); - if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); - } - } else { - // requested *no* compression. - GPR_ASSERT( - !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } - - ++k; - } - - gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); - gpr_free(log_suffix); +bool InteropClient::DoClientCompressedStreaming() { + // Probing for compression-checks support. + ClientContext probe_context; + StreamingInputCallRequest probe_req; + StreamingInputCallResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); + + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream( + serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); + + if (!probe_stream->Write(probe_req)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + Status s = probe_stream->Finish(); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, + "Compressed streaming request probe succeeded. Proceeding."); + + ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( + serviceStub_.Get()->StreamingInputCall(&context, &response)); + + request.mutable_payload()->set_body(grpc::string(27182, '\0')); + request.mutable_expect_compressed()->set_value(true); + gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + + WriteOptions wopts; + wopts.set_no_compression(); + request.mutable_payload()->set_body(grpc::string(45904, '\0')); + request.mutable_expect_compressed()->set_value(false); + gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); + if (!stream->Write(request, wopts)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + GPR_ASSERT(stream->WritesDone()); + + s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } + + return true; +} + +bool InteropClient::DoServerCompressedStreaming() { + const std::vector<bool> compressions = {true, false}; + const std::vector<int> sizes = {31415, 92653}; + + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + GPR_ASSERT(compressions.size() == sizes.size()); + for (size_t i = 0; i < sizes.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", + compressions[i] ? "true" : "false", sizes[i]); + + gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); + gpr_free(log_suffix); + + ResponseParameters* const response_parameter = + request.add_response_parameters(); + response_parameter->mutable_compressed()->set_value(compressions[i]); + response_parameter->set_size(sizes[i]); + } + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + serviceStub_.Get()->StreamingOutputCall(&context, request)); - if (k < response_stream_sizes.size()) { - // stream->Read() failed before reading all the expected messages. This - // is most likely due to a connection failure. - gpr_log(GPR_ERROR, - "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR - ") is " - "less than the expected messages (i.e " - "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR - ", j=%" PRIuPTR ")", - k, response_stream_sizes.size(), i, j); - return TransientFailureOrAbort(); - } - - Status s = stream->Finish(); - if (!AssertStatusOk(s)) { - return false; - } + size_t k = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + // Payload size checks. + GPR_ASSERT(response.payload().body() == + grpc::string(request.response_parameters(k).size(), '\0')); + + // Compression checks. + GPR_ASSERT(request.response_parameters(k).has_compressed()); + if (request.response_parameters(k).compressed().value()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // requested *no* compression. + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } + ++k; } + if (k < sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR + ") is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%" PRIuPTR ")).", + __func__, k, response_stream_sizes.size()); + return TransientFailureOrAbort(); + } + + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } return true; } @@ -633,7 +688,6 @@ bool InteropClient::DoPingPong() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; @@ -700,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); @@ -840,7 +893,6 @@ bool InteropClient::DoCustomMetadata() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index ae75762bb8..eb886fcb7e 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -45,9 +45,9 @@ namespace grpc { namespace testing { // Function pointer for custom checks. -using CheckerFn = - std::function<void(const InteropClientContextInspector&, - const SimpleRequest*, const SimpleResponse*)>; +typedef std::function<void(const InteropClientContextInspector&, + const SimpleRequest*, const SimpleResponse*)> + CheckerFn; class InteropClient { public: @@ -64,12 +64,14 @@ class InteropClient { bool DoEmpty(); bool DoLargeUnary(); - bool DoLargeCompressedUnary(); + bool DoServerCompressedUnary(); + bool DoClientCompressedUnary(); bool DoPingPong(); bool DoHalfDuplex(); bool DoRequestStreaming(); bool DoResponseStreaming(); - bool DoResponseCompressedStreaming(); + bool DoServerCompressedStreaming(); + bool DoClientCompressedStreaming(); bool DoResponseStreamingWithSlowConsumer(); bool DoCancelAfterBegin(); bool DoCancelAfterFirstResponse(); diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/interop_server.cc index bbedda14d2..ebef0002a3 100644 --- a/test/cpp/interop/server_main.cc +++ b/test/cpp/interop/interop_server.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -64,10 +65,10 @@ using grpc::ServerCredentials; using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; +using grpc::WriteOptions; using grpc::SslServerCredentialsOptions; using grpc::testing::InteropServerContextInspector; using grpc::testing::Payload; -using grpc::testing::PayloadType; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StreamingInputCallRequest; @@ -78,7 +79,6 @@ using grpc::testing::TestService; using grpc::Status; static bool got_sigint = false; -static const char* kRandomFile = "test/cpp/interop/rnd.dat"; const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; @@ -110,34 +110,41 @@ void MaybeEchoMetadata(ServerContext* context) { } } -bool SetPayload(PayloadType response_type, int size, Payload* payload) { - payload->set_type(response_type); - switch (response_type) { - case PayloadType::COMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } break; - case PayloadType::UNCOMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - rnd_file.read(body.get(), size); - GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available - payload->set_body(body.get(), size); - } break; - default: - GPR_ASSERT(false); - } +bool SetPayload(int size, Payload* payload) { + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); return true; } -template <typename RequestType> -void SetResponseCompression(ServerContext* context, - const RequestType& request) { - if (request.request_compressed_response()) { - // Any level would do, let's go for HIGH because we are overachievers. - context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); +bool CheckExpectedCompression(const ServerContext& context, + const bool compression_expected) { + const InteropServerContextInspector inspector(context); + const grpc_compression_algorithm received_compression = + inspector.GetCallCompressionAlgorithm(); + + if (compression_expected) { + if (received_compression == GRPC_COMPRESS_NONE) { + // Expected some compression, got NONE. This is an error. + gpr_log(GPR_ERROR, + "Expected compression but got uncompressed request from client."); + return false; + } + if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { + gpr_log(GPR_ERROR, + "Failure: Requested compression in a compressable request, but " + "compression bit in message flags not set."); + return false; + } + } else { + // Didn't expect compression -> make sure the request is uncompressed + if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { + gpr_log(GPR_ERROR, + "Failure: Didn't requested compression, but compression bit in " + "message flags set."); + return false; + } } + return true; } class TestServiceImpl : public TestService::Service { @@ -151,11 +158,26 @@ class TestServiceImpl : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { MaybeEchoMetadata(context); - SetResponseCompression(context, *request); + if (request->has_response_compressed()) { + const bool compression_requested = request->response_compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (compression_requested) { + // Any level would do, let's go for HIGH because we are overachievers. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + } else { + context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE); + } + } + if (!CheckExpectedCompression(*context, + request->expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + if (!SetPayload(request->response_size(), response->mutable_payload())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); } } @@ -171,17 +193,36 @@ class TestServiceImpl : public TestService::Service { Status StreamingOutputCall( ServerContext* context, const StreamingOutputCallRequest* request, ServerWriter<StreamingOutputCallResponse>* writer) { - SetResponseCompression(context, *request); StreamingOutputCallResponse response; bool write_success = true; for (int i = 0; write_success && i < request->response_parameters_size(); i++) { - if (!SetPayload(request->response_type(), - request->response_parameters(i).size(), + if (!SetPayload(request->response_parameters(i).size(), response.mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); } - write_success = writer->Write(response); + WriteOptions wopts; + if (request->response_parameters(i).has_compressed()) { + // Compress by default. Disabled on a per-message basis. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + const bool compression_requested = + request->response_parameters(i).compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (!compression_requested) { + wopts.set_no_compression(); + } // else, compression is already enabled via the context. + } + int time_us; + if ((time_us = request->response_parameters(i).interval_us()) > 0) { + // Sleep before response if needed + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(time_us, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); + } + write_success = writer->Write(response, wopts); } if (write_success) { return Status::OK; @@ -196,6 +237,11 @@ class TestServiceImpl : public TestService::Service { StreamingInputCallRequest request; int aggregated_payload_size = 0; while (reader->Read(&request)) { + if (!CheckExpectedCompression(*context, + request.expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request.has_payload()) { aggregated_payload_size += request.payload().body().size(); } @@ -213,11 +259,18 @@ class TestServiceImpl : public TestService::Service { StreamingOutputCallResponse response; bool write_success = true; while (write_success && stream->Read(&request)) { - SetResponseCompression(context, request); if (request.response_parameters_size() != 0) { response.mutable_payload()->set_type(request.payload().type()); response.mutable_payload()->set_body( grpc::string(request.response_parameters(0).size(), '\0')); + int time_us; + if ((time_us = request.response_parameters(0).interval_us()) > 0) { + // Sleep before response if needed + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(time_us, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); + } write_success = stream->Write(response); } } diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat Binary files differdeleted file mode 100644 index 8c7f38f9e0..0000000000 --- a/test/cpp/interop/rnd.dat +++ /dev/null diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc index c6d891ad71..8b0b511bcb 100644 --- a/test/cpp/interop/server_helper.cc +++ b/test/cpp/interop/server_helper.cc @@ -72,6 +72,10 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const { return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_); } +uint32_t InteropServerContextInspector::GetMessageFlags() const { + return grpc_call_test_only_get_message_flags(context_.call_); +} + std::shared_ptr<const AuthContext> InteropServerContextInspector::GetAuthContext() const { return context_.auth_context(); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 12865e4032..a1da14a4c8 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -54,6 +54,7 @@ class InteropServerContextInspector { bool IsCancelled() const; grpc_compression_algorithm GetCallCompressionAlgorithm() const; uint32_t GetEncodingsAcceptedByClient() const; + uint32_t GetMessageFlags() const; private: const ::grpc::ServerContext& context_; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index aa95682e74..1d5fc80cf2 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -138,8 +138,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoLargeUnary(); break; } - case LARGE_COMPRESSED_UNARY: { - is_success = interop_client_->DoLargeCompressedUnary(); + case CLIENT_COMPRESSED_UNARY: { + is_success = interop_client_->DoClientCompressedUnary(); + break; + } + case CLIENT_COMPRESSED_STREAMING: { + is_success = interop_client_->DoClientCompressedStreaming(); break; } case CLIENT_STREAMING: { @@ -150,8 +154,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoResponseStreaming(); break; } + case SERVER_COMPRESSED_UNARY: { + is_success = interop_client_->DoServerCompressedUnary(); + break; + } case SERVER_COMPRESSED_STREAMING: { - is_success = interop_client_->DoResponseCompressedStreaming(); + is_success = interop_client_->DoServerCompressedStreaming(); break; } case SLOW_CONSUMER: { diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index aa93b58b4a..cf6a713473 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -51,29 +51,33 @@ using std::vector; enum TestCaseType { UNKNOWN_TEST = -1, - EMPTY_UNARY = 0, - LARGE_UNARY = 1, - LARGE_COMPRESSED_UNARY = 2, - CLIENT_STREAMING = 3, - SERVER_STREAMING = 4, - SERVER_COMPRESSED_STREAMING = 5, - SLOW_CONSUMER = 6, - HALF_DUPLEX = 7, - PING_PONG = 8, - CANCEL_AFTER_BEGIN = 9, - CANCEL_AFTER_FIRST_RESPONSE = 10, - TIMEOUT_ON_SLEEPING_SERVER = 11, - EMPTY_STREAM = 12, - STATUS_CODE_AND_MESSAGE = 13, - CUSTOM_METADATA = 14 + EMPTY_UNARY, + LARGE_UNARY, + CLIENT_COMPRESSED_UNARY, + CLIENT_COMPRESSED_STREAMING, + CLIENT_STREAMING, + SERVER_STREAMING, + SERVER_COMPRESSED_UNARY, + SERVER_COMPRESSED_STREAMING, + SLOW_CONSUMER, + HALF_DUPLEX, + PING_PONG, + CANCEL_AFTER_BEGIN, + CANCEL_AFTER_FIRST_RESPONSE, + TIMEOUT_ON_SLEEPING_SERVER, + EMPTY_STREAM, + STATUS_CODE_AND_MESSAGE, + CUSTOM_METADATA }; const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { {EMPTY_UNARY, "empty_unary"}, {LARGE_UNARY, "large_unary"}, - {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, + {CLIENT_COMPRESSED_UNARY, "client_compressed_unary"}, + {CLIENT_COMPRESSED_STREAMING, "client_compressed_streaming"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, + {SERVER_COMPRESSED_UNARY, "server_compressed_unary"}, {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, {SLOW_CONSUMER, "slow_consumer"}, {HALF_DUPLEX, "half_duplex"}, diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2a89eb8018..047bd16408 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -125,13 +125,15 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Swap(&to_merge[i]); - latencies.Merge(to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i]); } - delete[] to_merge; - std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(to_merge[i]); + } + delete[] to_merge; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -213,6 +215,7 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), + new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -225,9 +228,16 @@ class Client { impl_.join(); } - void Swap(Histogram* n) { + void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - n->Swap(&histogram_); + new_stats_ = n; + } + + void EndSwap() { + std::unique_lock<std::mutex> g(mu_); + while (new_stats_ != nullptr) { + cv_.wait(g); + }; } void MergeStatsInto(Histogram* hist) { @@ -241,11 +251,10 @@ class Client { void ThreadFunc() { for (;;) { - // lock since the thread should only be doing one thing at a time - std::lock_guard<std::mutex> g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // see if we're done + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -253,11 +262,19 @@ class Client { if (done_) { return; } + // check if we're resetting stats, swap out the histogram if so + if (new_stats_) { + new_stats_->Swap(&histogram_); + new_stats_ = nullptr; + cv_.notify_one(); + } } } std::mutex mu_; + std::condition_variable cv_; bool done_; + Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 6ad4c320b5..3d98ab0939 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -249,7 +249,8 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { + : AsyncClient<BenchmarkService::Stub, SimpleRequest>( + config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -376,7 +377,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { + : AsyncClient<BenchmarkService::Stub, SimpleRequest>( + config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } @@ -511,7 +513,8 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericStub, ByteBuffer> { public: explicit GenericAsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, GenericStubCreator) { + : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx, + GenericStubCreator) { StartThreads(num_async_threads_); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 83dbdc3e59..08bf045883 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -31,6 +31,7 @@ * */ +#include <cinttypes> #include <deque> #include <list> #include <thread> diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc index cc6b39b753..1c7cd6382a 100644 --- a/test/cpp/util/metrics_server.cc +++ b/test/cpp/util/metrics_server.cc @@ -99,7 +99,7 @@ std::shared_ptr<QpsGauge> MetricsServiceImpl::CreateQpsGauge( std::lock_guard<std::mutex> lock(mu_); std::shared_ptr<QpsGauge> qps_gauge(new QpsGauge()); - const auto p = qps_gauges_.emplace(name, qps_gauge); + const auto p = qps_gauges_.insert(std::make_pair(name, qps_gauge)); // p.first is an iterator pointing to <name, shared_ptr<QpsGauge>> pair. // p.second is a boolean which is set to 'true' if the QpsGauge is @@ -114,7 +114,7 @@ std::shared_ptr<QpsGauge> MetricsServiceImpl::CreateQpsGauge( std::unique_ptr<grpc::Server> MetricsServiceImpl::StartServer(int port) { gpr_log(GPR_INFO, "Building metrics server.."); - const grpc::string address = "0.0.0.0:" + std::to_string(port); + const grpc::string address = "0.0.0.0:" + grpc::to_string(port); ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 6907d97bd5..25b720aee0 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -298,7 +298,7 @@ void ProtoReflectionDescriptorDatabase::AddFileFromResponse( const std::shared_ptr<ProtoReflectionDescriptorDatabase::ClientStream> ProtoReflectionDescriptorDatabase::GetStream() { - if (stream_ == nullptr) { + if (!stream_) { stream_ = stub_->ServerReflectionInfo(&ctx_); } return stream_; diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc index 9c09a73115..6e68f59e6a 100644 --- a/test/cpp/util/test_credentials_provider.cc +++ b/test/cpp/util/test_credentials_provider.cc @@ -41,15 +41,9 @@ #include "test/core/end2end/data/ssl_test_data.h" +namespace grpc { namespace { -using grpc::ChannelArguments; -using grpc::ChannelCredentials; -using grpc::InsecureChannelCredentials; -using grpc::InsecureServerCredentials; -using grpc::ServerCredentials; -using grpc::SslCredentialsOptions; -using grpc::SslServerCredentialsOptions; using grpc::testing::CredentialTypeProvider; // Provide test credentials. Thread-safe. @@ -69,19 +63,27 @@ class CredentialsProvider { class DefaultCredentialsProvider : public CredentialsProvider { public: - ~DefaultCredentialsProvider() override {} + ~DefaultCredentialsProvider() GRPC_OVERRIDE {} - void AddSecureType( - const grpc::string& type, - std::unique_ptr<CredentialTypeProvider> type_provider) override { + void AddSecureType(const grpc::string& type, + std::unique_ptr<CredentialTypeProvider> type_provider) + GRPC_OVERRIDE { // This clobbers any existing entry for type, except the defaults, which // can't be clobbered. grpc::unique_lock<grpc::mutex> lock(mu_); - added_secure_types_[type] = std::move(type_provider); + auto it = std::find(added_secure_type_names_.begin(), + added_secure_type_names_.end(), type); + if (it == added_secure_type_names_.end()) { + added_secure_type_names_.push_back(type); + added_secure_type_providers_.push_back(std::move(type_provider)); + } else { + added_secure_type_providers_[it - added_secure_type_names_.begin()] = + std::move(type_provider); + } } std::shared_ptr<ChannelCredentials> GetChannelCredentials( - const grpc::string& type, ChannelArguments* args) override { + const grpc::string& type, ChannelArguments* args) GRPC_OVERRIDE { if (type == grpc::testing::kInsecureCredentialsType) { return InsecureChannelCredentials(); } else if (type == grpc::testing::kTlsCredentialsType) { @@ -90,17 +92,19 @@ class DefaultCredentialsProvider : public CredentialsProvider { return SslCredentials(ssl_opts); } else { grpc::unique_lock<grpc::mutex> lock(mu_); - auto it(added_secure_types_.find(type)); - if (it == added_secure_types_.end()) { + auto it(std::find(added_secure_type_names_.begin(), + added_secure_type_names_.end(), type)); + if (it == added_secure_type_names_.end()) { gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); return nullptr; } - return it->second->GetChannelCredentials(args); + return added_secure_type_providers_[it - added_secure_type_names_.begin()] + ->GetChannelCredentials(args); } } std::shared_ptr<ServerCredentials> GetServerCredentials( - const grpc::string& type) override { + const grpc::string& type) GRPC_OVERRIDE { if (type == grpc::testing::kInsecureCredentialsType) { return InsecureServerCredentials(); } else if (type == grpc::testing::kTlsCredentialsType) { @@ -112,28 +116,32 @@ class DefaultCredentialsProvider : public CredentialsProvider { return SslServerCredentials(ssl_opts); } else { grpc::unique_lock<grpc::mutex> lock(mu_); - auto it(added_secure_types_.find(type)); - if (it == added_secure_types_.end()) { + auto it(std::find(added_secure_type_names_.begin(), + added_secure_type_names_.end(), type)); + if (it == added_secure_type_names_.end()) { gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); return nullptr; } - return it->second->GetServerCredentials(); + return added_secure_type_providers_[it - added_secure_type_names_.begin()] + ->GetServerCredentials(); } } - std::vector<grpc::string> GetSecureCredentialsTypeList() override { + std::vector<grpc::string> GetSecureCredentialsTypeList() GRPC_OVERRIDE { std::vector<grpc::string> types; types.push_back(grpc::testing::kTlsCredentialsType); grpc::unique_lock<grpc::mutex> lock(mu_); - for (const auto& type_pair : added_secure_types_) { - types.push_back(type_pair.first); + for (auto it = added_secure_type_names_.begin(); + it != added_secure_type_names_.end(); it++) { + types.push_back(*it); } return types; } private: grpc::mutex mu_; - std::unordered_map<grpc::string, std::unique_ptr<CredentialTypeProvider> > - added_secure_types_; + std::vector<grpc::string> added_secure_type_names_; + std::vector<std::unique_ptr<CredentialTypeProvider>> + added_secure_type_providers_; }; gpr_once g_once_init_provider = GPR_ONCE_INIT; @@ -148,7 +156,6 @@ CredentialsProvider* GetProvider() { } // namespace -namespace grpc { namespace testing { void AddSecureType(const grpc::string& type, |