diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 15 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/filter_end2end_test.cc | 353 | ||||
-rw-r--r-- | test/cpp/end2end/proto_server_reflection_test.cc | 24 | ||||
-rw-r--r-- | test/cpp/end2end/server_builder_plugin_test.cc | 7 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_api_test.cc | 18 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 689 | ||||
-rw-r--r-- | test/cpp/interop/interop_server.cc | 23 | ||||
-rw-r--r-- | test/cpp/interop/interop_server_bootstrap.cc | 54 | ||||
-rw-r--r-- | test/cpp/interop/server_helper.h | 6 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 21 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 31 | ||||
-rw-r--r-- | test/cpp/qps/limit_cores.cc | 4 | ||||
-rw-r--r-- | test/cpp/util/grpc_cli.cc | 89 | ||||
-rw-r--r-- | test/cpp/util/proto_file_parser.cc | 47 | ||||
-rw-r--r-- | test/cpp/util/proto_file_parser.h | 11 | ||||
-rw-r--r-- | test/cpp/util/proto_reflection_descriptor_database.cc | 24 | ||||
-rw-r--r-- | test/cpp/util/proto_reflection_descriptor_database.h | 23 |
18 files changed, 1308 insertions, 133 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 4a8936d281..ac79fe8274 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -200,6 +200,10 @@ class Verifier { bool spin_; }; +bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { + return plugin->has_sync_methods(); +} + // This class disables the server builder plugins that may add sync services to // the server. If there are sync services, UnimplementedRpc test will triger // the sync unkown rpc routine on the server side, rather than the async one @@ -210,14 +214,9 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) GRPC_OVERRIDE { - auto plugin = plugins->begin(); - while (plugin != plugins->end()) { - if ((*plugin)->has_sync_methods()) { - plugins->erase(plugin++); - } else { - plugin++; - } - } + plugins->erase(std::remove_if(plugins->begin(), plugins->end(), + plugin_has_sync_methods), + plugins->end()); } }; diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 0f87ae3e44..46a58d3ac3 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1414,7 +1414,7 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) { std::shared_ptr<const AuthContext> auth_ctx = context.auth_context(); std::vector<grpc::string_ref> tst = auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, tst.size()); + ASSERT_EQ(1u, tst.size()); EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); if (GetParam().credentials_type == kTlsCredentialsType) { EXPECT_EQ("x509_subject_alternative_name", diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc new file mode 100644 index 0000000000..853720fd0d --- /dev/null +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -0,0 +1,353 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <memory> +#include <mutex> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/generic/async_generic_service.h> +#include <grpc++/generic/generic_stub.h> +#include <grpc++/impl/codegen/proto_utils.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/support/config.h> +#include <grpc++/support/slice.h> +#include <grpc/grpc.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <gtest/gtest.h> + +#include "src/cpp/common/channel_filter.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/byte_buffer_proto_helper.h" + +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +void* tag(int i) { return (void*)(intptr_t)i; } + +void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { + bool ok; + void* got_tag; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + EXPECT_EQ(expect_ok, ok); + EXPECT_EQ(tag(i), got_tag); +} + +namespace { + +int global_num_connections = 0; +int global_num_calls = 0; +mutex global_mu; + +void IncrementConnectionCounter() { + unique_lock<mutex> lock(global_mu); + ++global_num_connections; +} + +void ResetConnectionCounter() { + unique_lock<mutex> lock(global_mu); + global_num_connections = 0; +} + +int GetConnectionCounterValue() { + unique_lock<mutex> lock(global_mu); + return global_num_connections; +} + +void IncrementCallCounter() { + unique_lock<mutex> lock(global_mu); + ++global_num_calls; +} + +void ResetCallCounter() { + unique_lock<mutex> lock(global_mu); + global_num_calls = 0; +} + +int GetCallCounterValue() { + unique_lock<mutex> lock(global_mu); + return global_num_calls; +} + +} // namespace + +class ChannelDataImpl : public ChannelData { + public: + ChannelDataImpl(const grpc_channel_args& args, const char* peer) + : ChannelData(args, peer) { + IncrementConnectionCounter(); + } +}; + +class CallDataImpl : public CallData { + public: + explicit CallDataImpl(const ChannelDataImpl& channel_data) + : CallData(channel_data) {} + + void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + TransportStreamOp* op) GRPC_OVERRIDE { + // Incrementing the counter could be done from the ctor, but we want + // to test that the individual methods are actually called correctly. + if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); + grpc_call_next_op(exec_ctx, elem, op->op()); + } +}; + +class FilterEnd2endTest : public ::testing::Test { + protected: + FilterEnd2endTest() : server_host_("localhost") {} + + void SetUp() GRPC_OVERRIDE { + int port = grpc_pick_unused_port_or_die(); + server_address_ << server_host_ << ":" << port; + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder.RegisterAsyncGenericService(&generic_service_); + srv_cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cli_cq_.Shutdown(); + srv_cq_->Shutdown(); + while (cli_cq_.Next(&ignored_tag, &ignored_ok)) + ; + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) + ; + } + + void ResetStub() { + std::shared_ptr<Channel> channel = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + generic_stub_.reset(new GenericStub(channel)); + ResetConnectionCounter(); + ResetCallCounter(); + } + + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } + void client_ok(int i) { verify_ok(&cli_cq_, i, true); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } + void client_fail(int i) { verify_ok(&cli_cq_, i, false); } + + void SendRpc(int num_rpcs) { + const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo"); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter stream(&srv_ctx); + + // The string needs to be long enough to test heap-based slice. + send_request.set_message("Hello world. Hello world. Hello world."); + std::unique_ptr<GenericClientAsyncReaderWriter> call = + generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); + client_ok(1); + std::unique_ptr<ByteBuffer> send_buffer = + SerializeToByteBuffer(&send_request); + call->Write(*send_buffer, tag(2)); + // Send ByteBuffer can be destroyed after calling Write. + send_buffer.reset(); + client_ok(2); + call->WritesDone(tag(3)); + client_ok(3); + + generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), + srv_cq_.get(), tag(4)); + + verify_ok(srv_cq_.get(), 4, true); + EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); + EXPECT_EQ(kMethodName, srv_ctx.method()); + ByteBuffer recv_buffer; + stream.Read(&recv_buffer, tag(5)); + server_ok(5); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + send_buffer = SerializeToByteBuffer(&send_response); + stream.Write(*send_buffer, tag(6)); + send_buffer.reset(); + server_ok(6); + + stream.Finish(Status::OK, tag(7)); + server_ok(7); + + recv_buffer.Clear(); + call->Read(&recv_buffer, tag(8)); + client_ok(8); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); + + call->Finish(&recv_status, tag(9)); + client_ok(9); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + } + + CompletionQueue cli_cq_; + std::unique_ptr<ServerCompletionQueue> srv_cq_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<grpc::GenericStub> generic_stub_; + std::unique_ptr<Server> server_; + AsyncGenericService generic_service_; + const grpc::string server_host_; + std::ostringstream server_address_; +}; + +TEST_F(FilterEnd2endTest, SimpleRpc) { + ResetStub(); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); + SendRpc(1); + EXPECT_EQ(1, GetConnectionCounterValue()); + EXPECT_EQ(1, GetCallCounterValue()); +} + +TEST_F(FilterEnd2endTest, SequentialRpcs) { + ResetStub(); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); + SendRpc(10); + EXPECT_EQ(1, GetConnectionCounterValue()); + EXPECT_EQ(10, GetCallCounterValue()); +} + +// One ping, one pong. +TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { + ResetStub(); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); + + const grpc::string kMethodName( + "/grpc.cpp.test.util.EchoTestService/BidiStream"); + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter srv_stream(&srv_ctx); + + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + send_request.set_message("Hello"); + std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream = + generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); + client_ok(1); + + generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); + + verify_ok(srv_cq_.get(), 2, true); + EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); + EXPECT_EQ(kMethodName, srv_ctx.method()); + + std::unique_ptr<ByteBuffer> send_buffer = + SerializeToByteBuffer(&send_request); + cli_stream->Write(*send_buffer, tag(3)); + send_buffer.reset(); + client_ok(3); + + ByteBuffer recv_buffer; + srv_stream.Read(&recv_buffer, tag(4)); + server_ok(4); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + send_buffer = SerializeToByteBuffer(&send_response); + srv_stream.Write(*send_buffer, tag(5)); + send_buffer.reset(); + server_ok(5); + + cli_stream->Read(&recv_buffer, tag(6)); + client_ok(6); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->WritesDone(tag(7)); + client_ok(7); + + srv_stream.Read(&recv_buffer, tag(8)); + server_fail(8); + + srv_stream.Finish(Status::OK, tag(9)); + server_ok(9); + + cli_stream->Finish(&recv_status, tag(10)); + client_ok(10); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + + EXPECT_EQ(1, GetCallCounterValue()); + EXPECT_EQ(1, GetConnectionCounterValue()); +} + +void RegisterFilter() { + grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>( + "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::RegisterFilter(); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc index f8fc39b553..efbb0e1f8e 100644 --- a/test/cpp/end2end/proto_server_reflection_test.cc +++ b/test/cpp/end2end/proto_server_reflection_test.cc @@ -31,7 +31,6 @@ * */ -#include <google/protobuf/descriptor.h> #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> @@ -59,7 +58,7 @@ class ProtoServerReflectionTest : public ::testing::Test { void SetUp() GRPC_OVERRIDE { port_ = grpc_pick_unused_port_or_die(); - ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool(); + ref_desc_pool_ = protobuf::DescriptorPool::generated_pool(); ServerBuilder builder; grpc::string server_address = "localhost:" + to_string(port_); @@ -73,7 +72,7 @@ class ProtoServerReflectionTest : public ::testing::Test { CreateChannel(target, InsecureChannelCredentials()); stub_ = grpc::testing::EchoTestService::NewStub(channel); desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel)); - desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get())); + desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get())); } string to_string(const int number) { @@ -83,15 +82,15 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareService(const grpc::string& service) { - const google::protobuf::ServiceDescriptor* service_desc = + const protobuf::ServiceDescriptor* service_desc = desc_pool_->FindServiceByName(service); - const google::protobuf::ServiceDescriptor* ref_service_desc = + const protobuf::ServiceDescriptor* ref_service_desc = ref_desc_pool_->FindServiceByName(service); EXPECT_TRUE(service_desc != nullptr); EXPECT_TRUE(ref_service_desc != nullptr); EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString()); - const google::protobuf::FileDescriptor* file_desc = service_desc->file(); + const protobuf::FileDescriptor* file_desc = service_desc->file(); if (known_files_.find(file_desc->package() + "/" + file_desc->name()) != known_files_.end()) { EXPECT_EQ(file_desc->DebugString(), @@ -105,9 +104,9 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareMethod(const grpc::string& method) { - const google::protobuf::MethodDescriptor* method_desc = + const protobuf::MethodDescriptor* method_desc = desc_pool_->FindMethodByName(method); - const google::protobuf::MethodDescriptor* ref_method_desc = + const protobuf::MethodDescriptor* ref_method_desc = ref_desc_pool_->FindMethodByName(method); EXPECT_TRUE(method_desc != nullptr); EXPECT_TRUE(ref_method_desc != nullptr); @@ -122,9 +121,8 @@ class ProtoServerReflectionTest : public ::testing::Test { return; } - const google::protobuf::Descriptor* desc = - desc_pool_->FindMessageTypeByName(type); - const google::protobuf::Descriptor* ref_desc = + const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(type); + const protobuf::Descriptor* ref_desc = ref_desc_pool_->FindMessageTypeByName(type); EXPECT_TRUE(desc != nullptr); EXPECT_TRUE(ref_desc != nullptr); @@ -135,10 +133,10 @@ class ProtoServerReflectionTest : public ::testing::Test { std::unique_ptr<Server> server_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<ProtoReflectionDescriptorDatabase> desc_db_; - std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_; + std::unique_ptr<protobuf::DescriptorPool> desc_pool_; std::unordered_set<string> known_files_; std::unordered_set<string> known_types_; - const google::protobuf::DescriptorPool* ref_desc_pool_; + const protobuf::DescriptorPool* ref_desc_pool_; int port_; reflection::ProtoServerReflectionPlugin plugin_; }; diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc index 778a2be573..b967a5d1e9 100644 --- a/test/cpp/end2end/server_builder_plugin_test.cc +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -191,7 +191,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { // we run some tests without a service, and for those we need to supply a // frequently polled completion queue cq_ = builder_->AddCompletionQueue(); - cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this)); + cq_thread_ = new grpc::thread(&ServerBuilderPluginTest::RunCQ, this); server_ = builder_->BuildAndStart(); EXPECT_TRUE(CheckPresent()); } @@ -209,7 +209,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { EXPECT_TRUE(plugin->finish_is_called()); server_->Shutdown(); cq_->Shutdown(); - cq_thread_.join(); + cq_thread_->join(); + delete cq_thread_; } string to_string(const int number) { @@ -224,7 +225,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<Server> server_; - grpc::thread cq_thread_; + grpc::thread* cq_thread_; TestServiceImpl service_; int port_; diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index bf77878e0a..33de1ee93c 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -58,26 +58,24 @@ TEST_F(GrpclbTest, CreateRequest) { grpc_grpclb_request_destroy(c_req); } -TEST_F(GrpclbTest, ParseResponse) { +TEST_F(GrpclbTest, ParseInitialResponse) { LoadBalanceResponse response; auto* initial_response = response.mutable_initial_response(); auto* client_stats_report_interval = initial_response->mutable_client_stats_report_interval(); client_stats_report_interval->set_seconds(123); client_stats_report_interval->set_nanos(456); - const std::string encoded_response = response.SerializeAsString(); gpr_slice encoded_slice = gpr_slice_from_copied_string(encoded_response.c_str()); - grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice); - EXPECT_TRUE(c_response->has_initial_response); - EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate); - EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds, - 123); - EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos, - 456); + + grpc_grpclb_initial_response* c_initial_response = + grpc_grpclb_initial_response_parse(encoded_slice); + EXPECT_FALSE(c_initial_response->has_load_balancer_delegate); + EXPECT_EQ(c_initial_response->client_stats_report_interval.seconds, 123); + EXPECT_EQ(c_initial_response->client_stats_report_interval.nanos, 456); gpr_slice_unref(encoded_slice); - grpc_grpclb_response_destroy(c_response); + grpc_grpclb_initial_response_destroy(c_initial_response); } TEST_F(GrpclbTest, ParseResponseServerList) { diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc new file mode 100644 index 0000000000..b2fdce2963 --- /dev/null +++ b/test/cpp/grpclb/grpclb_test.cc @@ -0,0 +1,689 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <cinttypes> +#include <cstdarg> +#include <cstdint> +#include <cstring> +#include <string> + +extern "C" { +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +#include "src/core/ext/client_config/client_channel.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/support/tmpfile.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/server.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +} + +#include "src/proto/grpc/lb/v1/load_balancer.pb.h" + +#define NUM_BACKENDS 4 + +// TODO(dgq): Other scenarios in need of testing: +// - Send an empty serverlist update and verify that the client request blocks +// until a new serverlist with actual contents is available. +// - Send identical serverlist update +// - Test reception of invalid serverlist +// - Test pinging +// - Test against a non-LB server. That server should return UNIMPLEMENTED and +// the call should fail. +// - Random LB server closing the stream unexpectedly. + +namespace grpc { +namespace { + +typedef struct client_fixture { + grpc_channel *client; + char *server_uri; + grpc_completion_queue *cq; +} client_fixture; + +typedef struct server_fixture { + grpc_server *server; + grpc_call *server_call; + grpc_completion_queue *cq; + char *servers_hostport; + int port; + gpr_thd_id tid; + int num_calls_serviced; +} server_fixture; + +typedef struct test_fixture { + server_fixture lb_server; + server_fixture lb_backends[NUM_BACKENDS]; + client_fixture client; + int lb_server_update_delay_ms; +} test_fixture; + +static void *tag(intptr_t t) { return (void *)t; } + +static gpr_slice build_response_payload_slice( + const char *host, int *ports, size_t nports, + int64_t expiration_interval_secs, int32_t expiration_interval_nanos) { + // server_list { + // servers { + // ip_address: "127.0.0.1" + // port: ... + // load_balance_token: "token..." + // } + // ... + // } + grpc::lb::v1::LoadBalanceResponse response; + auto *serverlist = response.mutable_server_list(); + + if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) { + auto *expiration_interval = serverlist->mutable_expiration_interval(); + if (expiration_interval_secs > 0) { + expiration_interval->set_seconds(expiration_interval_secs); + } + if (expiration_interval_nanos > 0) { + expiration_interval->set_nanos(expiration_interval_nanos); + } + } + for (size_t i = 0; i < nports; i++) { + auto *server = serverlist->add_servers(); + server->set_ip_address(host); + server->set_port(ports[i]); + // The following long long int cast is meant to work around the + // disfunctional implementation of std::to_string in gcc 4.4, which doesn't + // have a version for int but does have one for long long int. + server->set_load_balance_token("token" + + std::to_string((long long int)ports[i])); + } + + gpr_log(GPR_INFO, "generating response: %s", + response.ShortDebugString().c_str()); + + const gpr_slice response_slice = + gpr_slice_from_copied_string(response.SerializeAsString().c_str()); + return response_slice; +} + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void sleep_ms(int delay_ms) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); +} + +static void start_lb_server(server_fixture *sf, int *ports, size_t nports, + int update_delay_ms) { + grpc_call *s; + cq_verifier *cqv = cq_verifier_create(sf->cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_call_error error; + int was_cancelled = 2; + grpc_byte_buffer *request_payload_recv; + grpc_byte_buffer *response_payload; + + memset(ops, 0, sizeof(ops)); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + error = grpc_server_request_call(sf->server, &s, &call_details, + &request_metadata_recv, sf->cq, sf->cq, + tag(200)); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport); + cq_expect_completion(cqv, tag(200), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport); + + // receive request for backends + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(202), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport); + // TODO(dgq): validate request. + grpc_byte_buffer_destroy(request_payload_recv); + gpr_slice response_payload_slice; + for (int i = 0; i < 2; i++) { + if (i == 0) { + // First half of the ports. + response_payload_slice = + build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1); + } else { + // Second half of the ports. + sleep_ms(update_delay_ms); + response_payload_slice = + build_response_payload_slice("127.0.0.1", ports + (nports / 2), + (nports + 1) / 2 /* ceil */, -1, -1); + } + + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(203), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d", + sf->servers_hostport, i); + + grpc_byte_buffer_destroy(response_payload); + gpr_slice_unref(response_payload_slice); + } + gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(201), 1); + cq_expect_completion(cqv, tag(204), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out", + sf->servers_hostport); + + grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); +} + +static void start_backend_server(server_fixture *sf) { + grpc_call *s; + cq_verifier *cqv; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_call_error error; + int was_cancelled; + grpc_byte_buffer *request_payload_recv; + grpc_byte_buffer *response_payload; + grpc_event ev; + + while (true) { + memset(ops, 0, sizeof(ops)); + cqv = cq_verifier_create(sf->cq); + was_cancelled = 2; + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + error = grpc_server_request_call(sf->server, &s, &call_details, + &request_metadata_recv, sf->cq, sf->cq, + tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport); + ev = grpc_completion_queue_next(sf->cq, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL); + if (!ev.success) { + gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport); + cq_verifier_destroy(cqv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + return; + } + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport); + + bool exit = false; + gpr_slice response_payload_slice = + gpr_slice_from_copied_string("hello you"); + while (!exit) { + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + ev = grpc_completion_queue_next( + sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + if (ev.type == GRPC_OP_COMPLETE && ev.success) { + GPR_ASSERT(ev.tag = tag(102)); + if (request_payload_recv == NULL) { + exit = true; + gpr_log(GPR_INFO, + "Server[%s] recv \"close\" from client, exiting. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + } + } else { + gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + exit = true; + } + gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + + if (!exit) { + response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = + grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + ev = grpc_completion_queue_next( + sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + if (ev.type == GRPC_OP_COMPLETE && ev.success) { + GPR_ASSERT(ev.tag = tag(103)); + } else { + gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + exit = true; + } + gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + grpc_byte_buffer_destroy(response_payload); + } + + grpc_byte_buffer_destroy(request_payload_recv); + } + ++sf->num_calls_serviced; + + gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport); + gpr_slice_unref(response_payload_slice); + + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "Backend server out a-ok"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(101), 1); + cq_expect_completion(cqv, tag(104), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls", + sf->servers_hostport, sf->num_calls_serviced); + + grpc_call_destroy(s); + cq_verifier_destroy(cqv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + } +} + +static void perform_request(client_fixture *cf) { + grpc_call *c; + cq_verifier *cqv = cq_verifier_create(cf->cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + grpc_byte_buffer *request_payload; + grpc_byte_buffer *response_payload_recv; + int i; + + memset(ops, 0, sizeof(ops)); + gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); + + c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS, + cf->cq, "/foo", "foo.test.google.fr:1234", + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL); + gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c); + GPR_ASSERT(c); + char *peer; + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + for (i = 0; i < 4; i++) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + peer = grpc_call_get_peer(c); + cq_expect_completion(cqv, tag(2), 1); + cq_verify(cqv); + gpr_free(peer); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload_recv); + } + + gpr_slice_unref(request_payload_slice); + + op = ops; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(1), 1); + cq_expect_completion(cqv, tag(3), 1); + cq_verify(cqv); + peer = grpc_call_get_peer(c); + gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer); + gpr_free(peer); + + grpc_call_destroy(c); + + cq_verify_empty_timeout(cqv, 1); + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + gpr_free(details); +} + +static void setup_client(const char *server_hostport, client_fixture *cf) { + cf->cq = grpc_completion_queue_create(NULL); + cf->server_uri = gpr_strdup(server_hostport); + cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL); +} + +static void teardown_client(client_fixture *cf) { + grpc_completion_queue_shutdown(cf->cq); + drain_cq(cf->cq); + grpc_completion_queue_destroy(cf->cq); + cf->cq = NULL; + grpc_channel_destroy(cf->client); + cf->client = NULL; + gpr_free(cf->server_uri); +} + +static void setup_server(const char *host, server_fixture *sf) { + int assigned_port; + + sf->cq = grpc_completion_queue_create(NULL); + const char *colon_idx = strchr(host, ':'); + if (colon_idx) { + const char *port_str = colon_idx + 1; + sf->port = atoi(port_str); + sf->servers_hostport = gpr_strdup(host); + } else { + sf->port = grpc_pick_unused_port_or_die(); + gpr_join_host_port(&sf->servers_hostport, host, sf->port); + } + + sf->server = grpc_server_create(NULL, NULL); + grpc_server_register_completion_queue(sf->server, sf->cq, NULL); + GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port( + sf->server, sf->servers_hostport)) > 0); + GPR_ASSERT(sf->port == assigned_port); + grpc_server_start(sf->server); +} + +static void teardown_server(server_fixture *sf) { + if (!sf->server) return; + + gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); + grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck( + sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(sf->server); + gpr_thd_join(sf->tid); + + sf->server = NULL; + grpc_completion_queue_shutdown(sf->cq); + drain_cq(sf->cq); + grpc_completion_queue_destroy(sf->cq); + + gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport); + gpr_free(sf->servers_hostport); +} + +static void fork_backend_server(void *arg) { + server_fixture *sf = static_cast<server_fixture *>(arg); + start_backend_server(sf); +} + +static void fork_lb_server(void *arg) { + test_fixture *tf = static_cast<test_fixture *>(arg); + int ports[NUM_BACKENDS]; + for (int i = 0; i < NUM_BACKENDS; i++) { + ports[i] = tf->lb_backends[i].port; + } + start_lb_server(&tf->lb_server, ports, NUM_BACKENDS, + tf->lb_server_update_delay_ms); +} + +static void setup_test_fixture(test_fixture *tf, + int lb_server_update_delay_ms) { + tf->lb_server_update_delay_ms = lb_server_update_delay_ms; + + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + + for (int i = 0; i < NUM_BACKENDS; ++i) { + setup_server("127.0.0.1", &tf->lb_backends[i]); + gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server, + &tf->lb_backends[i], &options); + } + + setup_server("127.0.0.1", &tf->lb_server); + gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options); + + char *server_uri; + gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1", + tf->lb_server.servers_hostport); + setup_client(server_uri, &tf->client); + gpr_free(server_uri); +} + +static void teardown_test_fixture(test_fixture *tf) { + teardown_client(&tf->client); + for (int i = 0; i < NUM_BACKENDS; ++i) { + teardown_server(&tf->lb_backends[i]); + } + teardown_server(&tf->lb_server); +} + +// The LB server will send two updates: batch 1 and batch 2. Each batch +// contains +// two addresses, both of a valid and running backend server. Batch 1 is +// readily +// available and provided as soon as the client establishes the streaming +// call. +// Batch 2 is sent after a delay of \a lb_server_update_delay_ms +// milliseconds. +static test_fixture test_update(int lb_server_update_delay_ms) { + gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms); + test_fixture tf; + memset(&tf, 0, sizeof(tf)); + setup_test_fixture(&tf, lb_server_update_delay_ms); + perform_request( + &tf.client); // "consumes" 1st backend server of 1st serverlist + perform_request( + &tf.client); // "consumes" 2nd backend server of 1st serverlist + + perform_request( + &tf.client); // "consumes" 1st backend server of 2nd serverlist + perform_request( + &tf.client); // "consumes" 2nd backend server of 2nd serverlist + + teardown_test_fixture(&tf); + gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms); + return tf; +} + +} // namespace +} // namespace grpc + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + + grpc::test_fixture tf_result; + // Clients take a bit over one second to complete a call (the last part of the + // call sleeps for 1 second while verifying the client's completion queue is + // empty). Therefore: + // + // If the LB server waits 800ms before sending an update, it will arrive + // before the first client request is done, skipping the second server from + // batch 1 altogether: the 2nd client request will go to the 1st server of + // batch 2 (ie, the third one out of the four total servers). + tf_result = grpc::test_update(800); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); + + // If the LB server waits 1500ms, the update arrives after having picked the + // 2nd server from batch 1 but before the next pick for the first server of + // batch 2. All server are used. + tf_result = grpc::test_update(1500); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); + + // If the LB server waits > 2000ms, the update arrives after the first two + // request are done and the third pick is performed, which returns, in RR + // fashion, the 1st server of the 1st update. Therefore, the second server of + // batch 1 is hit at least one, whereas the first server of batch 2 is never + // hit. + tf_result = grpc::test_update(2500); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); + + grpc_shutdown(); + return 0; +} diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index ebef0002a3..e5878bb248 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -31,7 +31,6 @@ * */ -#include <signal.h> #include <unistd.h> #include <fstream> @@ -78,8 +77,6 @@ using grpc::testing::StreamingOutputCallResponse; using grpc::testing::TestService; using grpc::Status; -static bool got_sigint = false; - const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent"; @@ -311,7 +308,9 @@ class TestServiceImpl : public TestService::Service { } }; -void RunServer() { +void grpc::testing::interop::RunServer( + std::shared_ptr<ServerCredentials> creds) { + GPR_ASSERT(FLAGS_port != 0); std::ostringstream server_address; server_address << "0.0.0.0:" << FLAGS_port; TestServiceImpl service; @@ -321,24 +320,10 @@ void RunServer() { ServerBuilder builder; builder.RegisterService(&service); - std::shared_ptr<ServerCredentials> creds = - grpc::testing::CreateInteropServerCredentials(); builder.AddListeningPort(server_address.str(), creds); std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); - while (!got_sigint) { + while (!g_got_sigint) { sleep(5); } } - -static void sigint_handler(int x) { got_sigint = true; } - -int main(int argc, char** argv) { - grpc::testing::InitTest(&argc, &argv, true); - signal(SIGINT, sigint_handler); - - GPR_ASSERT(FLAGS_port != 0); - RunServer(); - - return 0; -} diff --git a/test/cpp/interop/interop_server_bootstrap.cc b/test/cpp/interop/interop_server_bootstrap.cc new file mode 100644 index 0000000000..424f7ca7f0 --- /dev/null +++ b/test/cpp/interop/interop_server_bootstrap.cc @@ -0,0 +1,54 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <signal.h> +#include <unistd.h> + +#include "test/cpp/interop/server_helper.h" +#include "test/cpp/util/test_config.h" + +bool grpc::testing::interop::g_got_sigint = false; + +static void sigint_handler(int x) { + grpc::testing::interop::g_got_sigint = true; +} + +int main(int argc, char** argv) { + grpc::testing::InitTest(&argc, &argv, true); + signal(SIGINT, sigint_handler); + + grpc::testing::interop::RunServer( + grpc::testing::CreateInteropServerCredentials()); + + return 0; +} diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index a1da14a4c8..fc4ea8b3e8 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -60,6 +60,12 @@ class InteropServerContextInspector { const ::grpc::ServerContext& context_; }; +namespace interop { + +extern bool g_got_sigint; +void RunServer(std::shared_ptr<ServerCredentials> creds); + +} // namespace interop } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 4045e13460..fada4ba767 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -169,6 +169,7 @@ class Client { // Must call AwaitThreadsCompletion before destructor to avoid a race // between destructor and invocation of virtual ThreadFunc void AwaitThreadsCompletion() { + gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true)); DestroyMultithreading(); std::unique_lock<std::mutex> g(thread_completion_mu_); while (threads_remaining_ != 0) { @@ -178,8 +179,10 @@ class Client { protected: bool closed_loop_; + gpr_atm thread_pool_done_; void StartThreads(size_t num_threads) { + gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false)); threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -241,18 +244,9 @@ class Client { class Thread { public: Thread(Client* client, size_t idx) - : done_(false), - client_(client), - idx_(idx), - impl_(&Thread::ThreadFunc, this) {} + : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} - ~Thread() { - { - std::lock_guard<std::mutex> g(mu_); - done_ = true; - } - impl_.join(); - } + ~Thread() { impl_.join(); } void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); @@ -282,9 +276,9 @@ class Client { } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - done_ = true; } - if (done_) { + if (!thread_still_ok || + static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) { client_->CompleteThread(); return; } @@ -292,7 +286,6 @@ class Client { } std::mutex mu_; - bool done_; Histogram histogram_; Client* client_; const size_t idx_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 25c7823553..8062424a1f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -79,10 +79,29 @@ class SynchronousClient virtual ~SynchronousClient(){}; protected: - void WaitToIssue(int thread_idx) { + // WaitToIssue returns false if we realize that we need to break out + bool WaitToIssue(int thread_idx) { if (!closed_loop_) { - gpr_sleep_until(NextIssueTime(thread_idx)); + const gpr_timespec next_issue_time = NextIssueTime(thread_idx); + // Avoid sleeping for too long continuously because we might + // need to terminate before then. This is an issue since + // exponential distribution can occasionally produce bad outliers + while (true) { + const gpr_timespec one_sec_delay = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN)); + if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) { + gpr_sleep_until(next_issue_time); + return true; + } else { + gpr_sleep_until(one_sec_delay); + if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) { + return false; + } + } + } } + return true; } size_t num_threads_; @@ -101,7 +120,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0); @@ -144,7 +165,9 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index 59ed369067..b5c222542b 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -68,9 +68,9 @@ int LimitCores(const int* cores, int cores_size) { cores_set++; } } - GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); + bool affinity_set = (sched_setaffinity(0, size, cpup) == 0); CPU_FREE(cpup); - return cores_set; + return affinity_set ? cores_set : num_cores; } } // namespace testing diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index c52e48bae6..53529da782 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -34,18 +34,21 @@ /* A command line tool to talk to a grpc server. Example of talking to grpc interop server: - grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \ - "response_size:10" --enable_ssl=false + grpc_cli call localhost:50051 UnaryCall "response_size:10" \ + --protofiles=src/proto/grpc/testing/test.proto --enable_ssl=false Options: - 1. --proto_path, if your proto file is not under current working directory, + 1. --protofiles, use this flag to provide a proto file if the server does + does not have the reflection service. + 2. --proto_path, if your proto file is not under current working directory, use this flag to provide a search root. It should work similar to the - counterpart in protoc. - 2. --metadata specifies metadata to be sent to the server, such as: + counterpart in protoc. This option is valid only when protofiles is + provided. + 3. --metadata specifies metadata to be sent to the server, such as: --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2" - 3. --enable_ssl, whether to use tls. - 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call - 3. --input_binary_file, a file containing the serialized request. The file + 4. --enable_ssl, whether to use tls. + 5. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call + 6. --input_binary_file, a file containing the serialized request. The file can be generated by calling something like: protoc --proto_path=src/proto/grpc/testing/ \ --encode=grpc.testing.SimpleRequest \ @@ -53,7 +56,7 @@ < input.txt > input.bin If this is used and no proto file is provided in the argument list, the method string has to be exact in the form of /package.service/method. - 4. --output_binary_file, a file to write binary format response into, it can + 7. --output_binary_file, a file to write binary format response into, it can be later decoded using protoc: protoc --proto_path=src/proto/grpc/testing/ \ --decode=grpc.testing.SimpleResponse \ @@ -61,6 +64,7 @@ < output.bin > output.txt */ +#include <unistd.h> #include <fstream> #include <iostream> #include <sstream> @@ -86,6 +90,8 @@ DEFINE_string(output_binary_file, "", DEFINE_string(metadata, "", "Metadata to send to server, in the form of key1:val1:key2:val2"); DEFINE_string(proto_path, ".", "Path to look for the proto file."); +// TODO(zyc): support a list of input proto files +DEFINE_string(protofiles, "", "Name of the proto file."); void ParseMetadataFlag( std::multimap<grpc::string, grpc::string>* client_metadata) { @@ -129,35 +135,61 @@ void PrintMetadata(const T& m, const grpc::string& message) { int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); - if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") { + if (argc < 4 || grpc::string(argv[1]) != "call") { std::cout << "Usage: grpc_cli call server_host:port method_name " << "[proto file] [text format request] [<options>]" << std::endl; + return 1; } - grpc::string file_name; grpc::string request_text; grpc::string server_address(argv[2]); grpc::string method_name(argv[3]); std::unique_ptr<grpc::testing::ProtoFileParser> parser; grpc::string serialized_request_proto; - if (argc == 6) { - file_name = argv[4]; - // TODO(yangg) read from stdin as well? - request_text = argv[5]; + if (argc == 5) { + request_text = argv[4]; + } + + std::shared_ptr<grpc::ChannelCredentials> creds; + if (!FLAGS_enable_ssl) { + creds = grpc::InsecureChannelCredentials(); + } else { + if (FLAGS_use_auth) { + creds = grpc::GoogleDefaultCredentials(); + } else { + creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); + } } + std::shared_ptr<grpc::Channel> channel = + grpc::CreateChannel(server_address, creds); if (request_text.empty() && FLAGS_input_binary_file.empty()) { - std::cout << "Missing input. Use text format input or " - << "--input_binary_file for serialized request" << std::endl; - return 1; - } else if (!request_text.empty()) { - parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name, - method_name)); + if (isatty(STDIN_FILENO)) { + std::cout << "reading request message from stdin..." << std::endl; + } + std::stringstream input_stream; + input_stream << std::cin.rdbuf(); + request_text = input_stream.str(); + } + + if (!request_text.empty()) { + if (!FLAGS_protofiles.empty()) { + parser.reset(new grpc::testing::ProtoFileParser( + FLAGS_proto_path, FLAGS_protofiles, method_name)); + } else { + parser.reset(new grpc::testing::ProtoFileParser(channel, method_name)); + } method_name = parser->GetFullMethodName(); if (parser->HasError()) { return 1; } + + if (!FLAGS_input_binary_file.empty()) { + std::cout + << "warning: request given in argv, ignoring --input_binary_file" + << std::endl; + } } if (parser) { @@ -175,19 +207,6 @@ int main(int argc, char** argv) { } std::cout << "connecting to " << server_address << std::endl; - std::shared_ptr<grpc::ChannelCredentials> creds; - if (!FLAGS_enable_ssl) { - creds = grpc::InsecureChannelCredentials(); - } else { - if (FLAGS_use_auth) { - creds = grpc::GoogleDefaultCredentials(); - } else { - creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); - } - } - std::shared_ptr<grpc::Channel> channel = - grpc::CreateChannel(server_address, creds); - grpc::string serialized_response_proto; std::multimap<grpc::string, grpc::string> client_metadata; std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, @@ -219,7 +238,7 @@ int main(int argc, char** argv) { } } else { std::cout << "Rpc failed with status code " << s.error_code() - << " error message " << s.error_message() << std::endl; + << ", error message: " << s.error_message() << std::endl; } return 0; diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index 25aec329eb..5b0d925e1c 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -95,9 +95,48 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_.reset( new google::protobuf::DynamicMessageFactory(importer_->pool())); + std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list; + for (int i = 0; i < file_desc->service_count(); i++) { + service_desc_list.push_back(file_desc->service(i)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method) + : has_error_(false), + desc_db_(new grpc::ProtoReflectionDescriptorDatabase(channel)), + desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) { + std::vector<std::string> service_list; + if (!desc_db_->GetServices(&service_list)) { + LogError( + "Failed to get services from the server, " + "it may not have the reflection service.\n" + "Please try to use the --protofiles option to provide a proto file."); + } + if (has_error_) { + return; + } + dynamic_factory_.reset( + new google::protobuf::DynamicMessageFactory(desc_pool_.get())); + + std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list; + for (auto it = service_list.begin(); it != service_list.end(); it++) { + service_desc_list.push_back(desc_pool_->FindServiceByName(*it)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::~ProtoFileParser() {} + +void ProtoFileParser::InitProtoFileParser( + const grpc::string& method, + const std::vector<const google::protobuf::ServiceDescriptor*> + service_desc_list) { const google::protobuf::MethodDescriptor* method_descriptor = nullptr; - for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) { - const auto* service_desc = file_desc->service(i); + for (auto it = service_desc_list.begin(); it != service_desc_list.end(); + it++) { + const auto* service_desc = *it; for (int j = 0; j < service_desc->method_count(); j++) { const auto* method_desc = service_desc->method(j); if (MethodNameMatch(method_desc->full_name(), method)) { @@ -130,8 +169,6 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_->GetPrototype(method_descriptor->output_type())->New()); } -ProtoFileParser::~ProtoFileParser() {} - grpc::string ProtoFileParser::GetSerializedProto( const grpc::string& text_format_proto, bool is_request) { grpc::string serialized; @@ -143,7 +180,7 @@ grpc::string ProtoFileParser::GetSerializedProto( LogError("Failed to parse text format to proto."); return ""; } - ok = request_prototype_->SerializeToString(&serialized); + ok = msg->SerializeToString(&serialized); if (!ok) { LogError("Failed to serialize proto."); return ""; diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h index 46cdd66503..b442d77db9 100644 --- a/test/cpp/util/proto_file_parser.h +++ b/test/cpp/util/proto_file_parser.h @@ -38,8 +38,10 @@ #include <google/protobuf/compiler/importer.h> #include <google/protobuf/dynamic_message.h> +#include <grpc++/channel.h> #include "src/compiler/config.h" +#include "test/cpp/util/proto_reflection_descriptor_database.h" namespace grpc { namespace testing { @@ -53,6 +55,9 @@ class ProtoFileParser { // even just Method. It will log an error if there is ambiguity. ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name, const grpc::string& method); + + ProtoFileParser(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method); ~ProtoFileParser(); grpc::string GetFullMethodName() const { return full_method_name_; } @@ -68,12 +73,18 @@ class ProtoFileParser { void LogError(const grpc::string& error_msg); private: + void InitProtoFileParser( + const grpc::string& method, + const std::vector<const google::protobuf::ServiceDescriptor*> services); + bool has_error_; grpc::string request_text_; grpc::string full_method_name_; google::protobuf::compiler::DiskSourceTree source_tree_; std::unique_ptr<ErrorPrinter> error_printer_; std::unique_ptr<google::protobuf::compiler::Importer> importer_; + std::unique_ptr<grpc::ProtoReflectionDescriptorDatabase> desc_db_; + std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_; std::unique_ptr<google::protobuf::DynamicMessageFactory> dynamic_factory_; std::unique_ptr<grpc::protobuf::Message> request_prototype_; std::unique_ptr<grpc::protobuf::Message> response_prototype_; diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 25b720aee0..8fd466feb0 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -53,10 +53,20 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( std::shared_ptr<grpc::Channel> channel) : stub_(ServerReflection::NewStub(channel)) {} -ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {} +ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() { + if (stream_) { + stream_->WritesDone(); + Status status = stream_->Finish(); + if (!status.ok()) { + gpr_log(GPR_ERROR, + "ServerReflectionInfo rpc failed. Error code: %d, details: %s", + (int)status.error_code(), status.error_message().c_str()); + } + } +} bool ProtoReflectionDescriptorDatabase::FindFileByName( - const string& filename, google::protobuf::FileDescriptorProto* output) { + const string& filename, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileByName(filename, output)) { return true; } @@ -101,7 +111,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileByName( } bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( - const string& symbol_name, google::protobuf::FileDescriptorProto* output) { + const string& symbol_name, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingSymbol(symbol_name, output)) { return true; } @@ -148,7 +158,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) { + protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingExtension(containing_type, field_number, output)) { return true; @@ -276,10 +286,10 @@ bool ProtoReflectionDescriptorDatabase::GetServices( return false; } -const google::protobuf::FileDescriptorProto +const protobuf::FileDescriptorProto ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto) { - google::protobuf::FileDescriptorProto file_desc_proto; + protobuf::FileDescriptorProto file_desc_proto; file_desc_proto.ParseFromString(byte_fd_proto); return file_desc_proto; } @@ -287,7 +297,7 @@ ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( void ProtoReflectionDescriptorDatabase::AddFileFromResponse( const grpc::reflection::v1alpha::FileDescriptorResponse& response) { for (int i = 0; i < response.file_descriptor_proto_size(); ++i) { - const google::protobuf::FileDescriptorProto file_proto = + const protobuf::FileDescriptorProto file_proto = ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i)); if (known_files_.find(file_proto.name()) == known_files_.end()) { known_files_.insert(file_proto.name()); diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h index 99c00675bb..eb7cf4907d 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.h +++ b/test/cpp/util/proto_reflection_descriptor_database.h @@ -38,19 +38,19 @@ #include <unordered_set> #include <vector> -#include <google/protobuf/descriptor.h> -#include <google/protobuf/descriptor.pb.h> -#include <google/protobuf/descriptor_database.h> +// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used +#ifdef GRPC_NO_GENERATED_CODE +#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" +#else #include <grpc++/ext/reflection.grpc.pb.h> +#endif // GRPC_NO_GENERATED_CODE #include <grpc++/grpc++.h> - namespace grpc { // ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and // provides the methods defined by DescriptorDatabase interfaces. It can be used // to feed a DescriptorPool instance. -class ProtoReflectionDescriptorDatabase - : public google::protobuf::DescriptorDatabase { +class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase { public: explicit ProtoReflectionDescriptorDatabase( std::unique_ptr<reflection::v1alpha::ServerReflection::Stub> stub); @@ -65,14 +65,13 @@ class ProtoReflectionDescriptorDatabase // Find a file by file name. Fills in in *output and returns true if found. // Otherwise, returns false, leaving the contents of *output undefined. bool FindFileByName(const string& filename, - google::protobuf::FileDescriptorProto* output) - GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file that declares the given fully-qualified symbol name. // If found, fills in *output and returns true, otherwise returns false // and leaves *output undefined. bool FindFileContainingSymbol(const string& symbol_name, - google::protobuf::FileDescriptorProto* output) + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file which defines an extension extending the given message type @@ -81,7 +80,7 @@ class ProtoReflectionDescriptorDatabase // must be a fully-qualified type name. bool FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Finds the tag numbers used by all known extensions of // extendee_type, and appends them to output in an undefined @@ -102,7 +101,7 @@ class ProtoReflectionDescriptorDatabase grpc::reflection::v1alpha::ServerReflectionResponse> ClientStream; - const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( + const protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto); void AddFileFromResponse( @@ -123,7 +122,7 @@ class ProtoReflectionDescriptorDatabase std::unordered_map<string, std::vector<int>> cached_extension_numbers_; std::mutex stream_mutex_; - google::protobuf::SimpleDescriptorDatabase cached_db_; + protobuf::SimpleDescriptorDatabase cached_db_; }; } // namespace grpc |