diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-18 08:57:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-18 08:57:19 -0700 |
commit | adbcec4f57f7e25859326a6daf8ad7e08f2805ca (patch) | |
tree | c029f503127946f0564134d343c8af9f88ac8da5 /test/cpp | |
parent | 7f10c299c7eb5d6688c20ae43c63b0548ca8acd1 (diff) | |
parent | a5f46e29bcee3068f5ec6691b7e06cf944430881 (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'test/cpp')
18 files changed, 1329 insertions, 168 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index b43c27f3f7..3d664e8825 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -65,6 +65,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq)); + } // MethodA1 trailing comment 1 // MethodA2 detached leading comment 1 // @@ -76,6 +79,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq)); + } // MethodA2 trailing comment 1 // Method A3 leading comment 1 std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { @@ -84,6 +90,9 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq)); + } // Method A3 trailing comment 1 // Method A4 leading comment 1 std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { @@ -92,15 +101,22 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); + } // Method A4 trailing comment 1 private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientWriterInterface< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) = 0; virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientReaderInterface< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) = 0; virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) = 0; virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0; }; class Stub final : public StubInterface { public: @@ -109,34 +125,50 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq)); + } std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>> MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response) { return std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>>(MethodA2Raw(context, response)); } std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq)); + } std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { return std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>>(MethodA3Raw(context, request)); } std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq)); + } std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { return std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context)); } std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); } + std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); + } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override; ::grpc::ClientAsyncWriter< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncWriter< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) override; ::grpc::ClientReader< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) override; ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncReader< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override; const ::grpc::RpcMethod rpcmethod_MethodA1_; const ::grpc::RpcMethod rpcmethod_MethodA2_; const ::grpc::RpcMethod rpcmethod_MethodA3_; @@ -372,9 +404,13 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); + } // MethodB1 trailing comment 1 private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; }; class Stub final : public StubInterface { public: @@ -383,10 +419,14 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); } + std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); + } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; const ::grpc::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); diff --git a/test/cpp/codegen/compiler_test_mock_golden b/test/cpp/codegen/compiler_test_mock_golden index 8e4b4d5911..f97c2dd83a 100644 --- a/test/cpp/codegen/compiler_test_mock_golden +++ b/test/cpp/codegen/compiler_test_mock_golden @@ -15,18 +15,23 @@ class MockServiceAStub : public ServiceA::StubInterface { public: MOCK_METHOD3(MethodA1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(AsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); + MOCK_METHOD3(PrepareAsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD2(MethodA2Raw, ::grpc::ClientWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response)); MOCK_METHOD4(AsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD3(PrepareAsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq)); MOCK_METHOD2(MethodA3Raw, ::grpc::ClientReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request)); MOCK_METHOD4(AsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD3(PrepareAsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD1(MethodA4Raw, ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context)); MOCK_METHOD3(AsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag)); + MOCK_METHOD2(PrepareAsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq)); }; class MockServiceBStub : public ServiceB::StubInterface { public: MOCK_METHOD3(MethodB1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(AsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); + MOCK_METHOD3(PrepareAsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); }; } // namespace grpc diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 17a094f7a2..570a3d1067 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -368,9 +368,8 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_fake_resolver_response_generator_unref(response_generator_); } - void ResetStub(int fallback_timeout = 0) { + void ResetStub() { ChannelArguments args; - args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); std::ostringstream uri; @@ -442,10 +441,10 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_exec_ctx_finish(&exec_ctx); } - const std::vector<int> GetBackendPorts(const size_t start_index = 0) const { + const std::vector<int> GetBackendPorts() const { std::vector<int> backend_ports; - for (size_t i = start_index; i < backend_servers_.size(); ++i) { - backend_ports.push_back(backend_servers_[i].port_); + for (const auto& bs : backend_servers_) { + backend_ports.push_back(bs.port_); } return backend_ports; } @@ -616,71 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, Fallback) { - const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); - const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); - const size_t kNumBackendInResolution = backends_.size() / 2; - - ResetStub(kFallbackTimeoutMs); - std::vector<AddressData> addresses; - addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); - for (size_t i = 0; i < kNumBackendInResolution; ++i) { - addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""}); - } - SetNextResolution(addresses); - - // Send non-empty serverlist only after kServerlistDelayMs - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - GetBackendPorts(kNumBackendInResolution /* start_index */), {}), - kServerlistDelayMs); - - // The first request. The client will block while it's still trying to - // contact the balancer. - gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - CheckRpcSendOk(kNumBackendInResolution); - gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - - // Fallback is used: each backend returned by the resolver should have - // gotten one request. - for (size_t i = 0; i < kNumBackendInResolution; ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); - } - - // Wait until update has been processed, as signaled by the backend returned - // by the balancer receiving a request. - do { - CheckRpcSendOk(1); - } while ( - backend_servers_[kNumBackendInResolution].service_->request_count() == 0); - for (size_t i = 0; i < backends_.size(); ++i) { - backend_servers_[i].service_->ResetCounters(); - } - - // Send out the second request. - gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - CheckRpcSendOk(backends_.size() - kNumBackendInResolution); - gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - - // Serverlist is used: each backend returned by the balancer should - // have gotten one request. - for (size_t i = 0; i < kNumBackendInResolution; ++i) { - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); - } - for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - - balancers_[0]->NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); - // and sent a single response. - EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); -} - TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 985a335f1b..0b69e9ba9a 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -73,12 +73,29 @@ grpc_cc_binary( deps = [":helpers"], ) +grpc_cc_library( + name = "fullstack_streaming_ping_pong_h", + testonly = 1, + hdrs = [ + "fullstack_streaming_ping_pong.h", + ], + deps = [":helpers"], +) + grpc_cc_binary( name = "bm_fullstack_streaming_ping_pong", testonly = 1, srcs = [ "bm_fullstack_streaming_ping_pong.cc", - "fullstack_streaming_ping_pong.h", + ], + deps = [":fullstack_streaming_ping_pong_h"], +) + +grpc_cc_library( + name = "fullstack_streaming_pump_h", + testonly = 1, + hdrs = [ + "fullstack_streaming_pump.h", ], deps = [":helpers"], ) @@ -88,9 +105,8 @@ grpc_cc_binary( testonly = 1, srcs = [ "bm_fullstack_streaming_pump.cc", - "fullstack_streaming_pump.h", ], - deps = [":helpers"], + deps = [":fullstack_streaming_pump_h"], ) grpc_cc_binary( @@ -103,14 +119,22 @@ grpc_cc_binary( ], ) +grpc_cc_library( + name = "fullstack_unary_ping_pong_h", + testonly = 1, + hdrs = [ + "fullstack_unary_ping_pong.h", + ], + deps = [":helpers"], +) + grpc_cc_binary( name = "bm_fullstack_unary_ping_pong", testonly = 1, srcs = [ "bm_fullstack_unary_ping_pong.cc", - "fullstack_unary_ping_pong.h", ], - deps = [":helpers"], + deps = [":fullstack_unary_ping_pong_h"], ) grpc_cc_binary( diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 59fb29dd60..2656566a50 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -22,7 +22,6 @@ #include <gflags/gflags.h> #include <fstream> #include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index 5477b860b4..ecd28c3f8a 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -41,6 +41,7 @@ extern "C" { #include "test/core/util/port.h" } +#include "src/cpp/client/create_channel_internal.h" #include "test/cpp/microbenchmarks/helpers.h" namespace grpc { diff --git a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h index ff1f966753..6df044f344 100644 --- a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h @@ -24,7 +24,6 @@ #include <benchmark/benchmark.h> #include <sstream> #include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" diff --git a/test/cpp/microbenchmarks/fullstack_streaming_pump.h b/test/cpp/microbenchmarks/fullstack_streaming_pump.h index f9db212b02..9e826091ec 100644 --- a/test/cpp/microbenchmarks/fullstack_streaming_pump.h +++ b/test/cpp/microbenchmarks/fullstack_streaming_pump.h @@ -24,7 +24,6 @@ #include <benchmark/benchmark.h> #include <sstream> #include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h index 76d278b2a0..d448938295 100644 --- a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h +++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h @@ -24,7 +24,6 @@ #include <benchmark/benchmark.h> #include <sstream> #include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" diff --git a/test/cpp/naming/BUILD b/test/cpp/naming/BUILD new file mode 100644 index 0000000000..24c3d1a443 --- /dev/null +++ b/test/cpp/naming/BUILD @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package( + default_visibility = ["//visibility:public"], + features = [ + "-layering_check", + "-parse_headers", + ], +) + +licenses(["notice"]) # Apache v2 + +load("//bazel:grpc_build_system.bzl", "grpc_sh_binary", "grpc_py_binary") + +load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests") + +# Meant to be invoked only through the top-level shell script driver. +grpc_sh_binary( + name = "resolver_component_tests_runner", + srcs = [ + "resolver_component_tests_runner.sh", + ], +) + +grpc_py_binary( + name = "test_dns_server", + srcs = ["test_dns_server.py"], + data = [ + "resolver_test_record_groups.yaml", + ], + deps = [ + "twisted", + "yaml", + ] +) + +generate_resolver_component_tests() diff --git a/test/cpp/naming/gen_build_yaml.py b/test/cpp/naming/gen_build_yaml.py new file mode 100755 index 0000000000..3a51fef7a0 --- /dev/null +++ b/test/cpp/naming/gen_build_yaml.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python2.7 +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Generates the appropriate build.json data for all the naming tests.""" + + +import yaml +import collections +import hashlib +import json + +_LOCAL_DNS_SERVER_ADDRESS = '127.0.0.1:15353' + +def _append_zone_name(name, zone_name): + return '%s.%s' % (name, zone_name) + +def _build_expected_addrs_cmd_arg(expected_addrs): + out = [] + for addr in expected_addrs: + out.append('%s,%s' % (addr['address'], str(addr['is_balancer']))) + return ';'.join(out) + +def main(): + resolver_component_data = '' + with open('test/cpp/naming/resolver_test_record_groups.yaml') as f: + resolver_component_data = yaml.load(f) + + json = { + 'resolver_component_test_cases': [ + { + 'target_name': _append_zone_name(test_case['record_to_resolve'], + resolver_component_data['resolver_component_tests_common_zone_name']), + 'expected_addrs': _build_expected_addrs_cmd_arg(test_case['expected_addrs']), + 'expected_chosen_service_config': (test_case['expected_chosen_service_config'] or ''), + 'expected_lb_policy': (test_case['expected_lb_policy'] or ''), + } for test_case in resolver_component_data['resolver_component_tests'] + ], + 'targets': [ + { + 'name': 'resolver_component_test' + unsecure_build_config_suffix, + 'build': 'test', + 'language': 'c++', + 'gtest': False, + 'run': False, + 'src': ['test/cpp/naming/resolver_component_test.cc'], + 'platforms': ['linux', 'posix', 'mac'], + 'deps': [ + 'grpc++_test_util' + unsecure_build_config_suffix, + 'grpc_test_util' + unsecure_build_config_suffix, + 'gpr_test_util', + 'grpc++' + unsecure_build_config_suffix, + 'grpc' + unsecure_build_config_suffix, + 'gpr', + 'grpc++_test_config', + ], + } for unsecure_build_config_suffix in ['_unsecure', ''] + ] + [ + { + 'name': 'resolver_component_tests_runner_invoker' + unsecure_build_config_suffix, + 'build': 'test', + 'language': 'c++', + 'gtest': False, + 'run': True, + 'src': ['test/cpp/naming/resolver_component_tests_runner_invoker.cc'], + 'platforms': ['linux', 'posix', 'mac'], + 'deps': [ + 'grpc++_test_util', + 'grpc_test_util', + 'gpr_test_util', + 'grpc++', + 'grpc', + 'gpr', + 'grpc++_test_config', + ], + 'args': [ + '--test_bin_name=resolver_component_test%s' % unsecure_build_config_suffix, + '--running_under_bazel=false', + ], + } for unsecure_build_config_suffix in ['_unsecure', ''] + ] + } + + print(yaml.dump(json)) + +if __name__ == '__main__': + main() diff --git a/test/cpp/naming/generate_resolver_component_tests.bzl b/test/cpp/naming/generate_resolver_component_tests.bzl new file mode 100755 index 0000000000..118d9452d9 --- /dev/null +++ b/test/cpp/naming/generate_resolver_component_tests.bzl @@ -0,0 +1,64 @@ +#!/usr/bin/env python2.7 +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_sh_binary", "grpc_cc_test", "grpc_cc_binary") + +def generate_resolver_component_tests(): + for unsecure_build_config_suffix in ['_unsecure', '']: + # meant to be invoked only through the top-level shell script driver + grpc_cc_binary( + name = "resolver_component_test%s" % unsecure_build_config_suffix, + testonly = 1, + srcs = [ + "resolver_component_test.cc", + ], + external_deps = [ + "gmock", + ], + deps = [ + "//test/cpp/util:test_util%s" % unsecure_build_config_suffix, + "//test/core/util:grpc_test_util%s" % unsecure_build_config_suffix, + "//test/core/util:gpr_test_util", + "//:grpc++%s" % unsecure_build_config_suffix, + "//:grpc%s" % unsecure_build_config_suffix, + "//:gpr", + "//test/cpp/util:test_config", + ], + ) + grpc_cc_test( + name = "resolver_component_tests_runner_invoker%s" % unsecure_build_config_suffix, + srcs = [ + "resolver_component_tests_runner_invoker.cc", + ], + deps = [ + "//test/cpp/util:test_util", + "//test/core/util:grpc_test_util", + "//test/core/util:gpr_test_util", + "//:grpc++", + "//:grpc", + "//:gpr", + "//test/cpp/util:test_config", + ], + data = [ + ":resolver_component_tests_runner", + ":resolver_component_test%s" % unsecure_build_config_suffix, + ":test_dns_server", + "resolver_test_record_groups.yaml", # include the transitive dependency so that the dns sever py binary can locate this + ], + args = [ + "--test_bin_name=resolver_component_test%s" % unsecure_build_config_suffix, + "--running_under_bazel=true", + ] + ) diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc new file mode 100644 index 0000000000..0857fb6a32 --- /dev/null +++ b/test/cpp/naming/resolver_component_test.cc @@ -0,0 +1,323 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include <string.h> + +#include <gflags/gflags.h> +#include <gmock/gmock.h> +#include <vector> + +#include "test/cpp/util/subprocess.h" +#include "test/cpp/util/test_config.h" + +extern "C" { +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/resolver.h" +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +} + +using std::vector; +using grpc::SubProcess; +using testing::UnorderedElementsAreArray; + +// Hack copied from "test/cpp/end2end/server_crash_test_client.cc"! +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google {} +namespace gflags {} +using namespace google; +using namespace gflags; + +DEFINE_string(target_name, "", "Target name to resolve."); +DEFINE_string(expected_addrs, "", + "Comma-separated list of expected " + "'<ip0:port0>,<is_balancer0>;<ip1:port1>,<is_balancer1>;...' " + "addresses of " + "backend and/or balancers. 'is_balancer' should be bool, i.e. " + "true or false."); +DEFINE_string(expected_chosen_service_config, "", + "Expected service config json string that gets chosen (no " + "whitespace). Empty for none."); +DEFINE_string( + local_dns_server_address, "", + "Optional. This address is placed as the uri authority if present."); +DEFINE_string(expected_lb_policy, "", + "Expected lb policy name that appears in resolver result channel " + "arg. Empty for none."); + +namespace { + +class GrpcLBAddress final { + public: + GrpcLBAddress(std::string address, bool is_balancer) + : is_balancer(is_balancer), address(address) {} + + bool operator==(const GrpcLBAddress &other) const { + return this->is_balancer == other.is_balancer && + this->address == other.address; + } + + bool operator!=(const GrpcLBAddress &other) const { + return !(*this == other); + } + + bool is_balancer; + std::string address; +}; + +vector<GrpcLBAddress> ParseExpectedAddrs(std::string expected_addrs) { + std::vector<GrpcLBAddress> out; + while (expected_addrs.size() != 0) { + // get the next <ip>,<port> (v4 or v6) + size_t next_comma = expected_addrs.find(","); + if (next_comma == std::string::npos) { + gpr_log( + GPR_ERROR, + "Missing ','. Expected_addrs arg should be a semi-colon-separated " + "list of " + "<ip-port>,<bool> pairs. Left-to-be-parsed arg is |%s|", + expected_addrs.c_str()); + abort(); + } + std::string next_addr = expected_addrs.substr(0, next_comma); + expected_addrs = expected_addrs.substr(next_comma + 1, std::string::npos); + // get the next is_balancer 'bool' associated with this address + size_t next_semicolon = expected_addrs.find(";"); + bool is_balancer = + gpr_is_true(expected_addrs.substr(0, next_semicolon).c_str()); + out.emplace_back(GrpcLBAddress(next_addr, is_balancer)); + if (next_semicolon == std::string::npos) { + break; + } + expected_addrs = + expected_addrs.substr(next_semicolon + 1, std::string::npos); + } + if (out.size() == 0) { + gpr_log(GPR_ERROR, + "expected_addrs arg should be a comma-separated list of " + "<ip-port>,<bool> pairs"); + abort(); + } + return out; +} + +gpr_timespec TestDeadline(void) { + return grpc_timeout_seconds_to_deadline(100); +} + +struct ArgsStruct { + gpr_event ev; + gpr_atm done_atm; + gpr_mu *mu; + grpc_pollset *pollset; + grpc_pollset_set *pollset_set; + grpc_combiner *lock; + grpc_channel_args *channel_args; + vector<GrpcLBAddress> expected_addrs; + std::string expected_service_config_string; + std::string expected_lb_policy; +}; + +void ArgsInit(grpc_exec_ctx *exec_ctx, ArgsStruct *args) { + gpr_event_init(&args->ev); + args->pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(args->pollset, &args->mu); + args->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset); + args->lock = grpc_combiner_create(); + gpr_atm_rel_store(&args->done_atm, 0); + args->channel_args = NULL; +} + +void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +void ArgsFinish(grpc_exec_ctx *exec_ctx, ArgsStruct *args) { + GPR_ASSERT(gpr_event_wait(&args->ev, TestDeadline())); + grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); + grpc_closure DoNothing_cb; + GRPC_CLOSURE_INIT(&DoNothing_cb, DoNothing, NULL, grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(exec_ctx, args->pollset, &DoNothing_cb); + // exec_ctx needs to be flushed before calling grpc_pollset_destroy() + grpc_channel_args_destroy(exec_ctx, args->channel_args); + grpc_exec_ctx_flush(exec_ctx); + grpc_pollset_destroy(exec_ctx, args->pollset); + gpr_free(args->pollset); + GRPC_COMBINER_UNREF(exec_ctx, args->lock, NULL); +} + +gpr_timespec NSecondDeadline(int seconds) { + return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(seconds, GPR_TIMESPAN)); +} + +void PollPollsetUntilRequestDone(ArgsStruct *args) { + gpr_timespec deadline = NSecondDeadline(10); + while (true) { + bool done = gpr_atm_acq_load(&args->done_atm) != 0; + if (done) { + break; + } + gpr_timespec time_left = + gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)); + gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", done, + time_left.tv_sec, time_left.tv_nsec); + GPR_ASSERT(gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) >= 0); + grpc_pollset_worker *worker = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(args->mu); + GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, args->pollset, &worker, + gpr_now(GPR_CLOCK_REALTIME), NSecondDeadline(1))); + gpr_mu_unlock(args->mu); + grpc_exec_ctx_finish(&exec_ctx); + } + gpr_event_set(&args->ev, (void *)1); +} + +void CheckServiceConfigResultLocked(grpc_channel_args *channel_args, + ArgsStruct *args) { + const grpc_arg *service_config_arg = + grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG); + if (args->expected_service_config_string != "") { + GPR_ASSERT(service_config_arg != NULL); + GPR_ASSERT(service_config_arg->type == GRPC_ARG_STRING); + EXPECT_EQ(service_config_arg->value.string, + args->expected_service_config_string); + } else { + GPR_ASSERT(service_config_arg == NULL); + } +} + +void CheckLBPolicyResultLocked(grpc_channel_args *channel_args, + ArgsStruct *args) { + const grpc_arg *lb_policy_arg = + grpc_channel_args_find(channel_args, GRPC_ARG_LB_POLICY_NAME); + if (args->expected_lb_policy != "") { + GPR_ASSERT(lb_policy_arg != NULL); + GPR_ASSERT(lb_policy_arg->type == GRPC_ARG_STRING); + EXPECT_EQ(lb_policy_arg->value.string, args->expected_lb_policy); + } else { + GPR_ASSERT(lb_policy_arg == NULL); + } +} + +void CheckResolverResultLocked(grpc_exec_ctx *exec_ctx, void *argsp, + grpc_error *err) { + ArgsStruct *args = (ArgsStruct *)argsp; + grpc_channel_args *channel_args = args->channel_args; + const grpc_arg *channel_arg = + grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + grpc_lb_addresses *addresses = + (grpc_lb_addresses *)channel_arg->value.pointer.p; + gpr_log(GPR_INFO, "num addrs found: %" PRIdPTR ". expected %" PRIdPTR, + addresses->num_addresses, args->expected_addrs.size()); + GPR_ASSERT(addresses->num_addresses == args->expected_addrs.size()); + std::vector<GrpcLBAddress> found_lb_addrs; + for (size_t i = 0; i < addresses->num_addresses; i++) { + grpc_lb_address addr = addresses->addresses[i]; + char *str; + grpc_sockaddr_to_string(&str, &addr.address, 1 /* normalize */); + gpr_log(GPR_INFO, "%s", str); + found_lb_addrs.emplace_back( + GrpcLBAddress(std::string(str), addr.is_balancer)); + gpr_free(str); + } + if (args->expected_addrs.size() != found_lb_addrs.size()) { + gpr_log(GPR_DEBUG, "found lb addrs size is: %" PRIdPTR + ". expected addrs size is %" PRIdPTR, + found_lb_addrs.size(), args->expected_addrs.size()); + abort(); + } + EXPECT_THAT(args->expected_addrs, UnorderedElementsAreArray(found_lb_addrs)); + CheckServiceConfigResultLocked(channel_args, args); + CheckLBPolicyResultLocked(channel_args, args); + gpr_atm_rel_store(&args->done_atm, 1); + gpr_mu_lock(args->mu); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); + gpr_mu_unlock(args->mu); +} + +TEST(ResolverComponentTest, TestResolvesRelevantRecords) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ArgsStruct args; + ArgsInit(&exec_ctx, &args); + args.expected_addrs = ParseExpectedAddrs(FLAGS_expected_addrs); + args.expected_service_config_string = FLAGS_expected_chosen_service_config; + args.expected_lb_policy = FLAGS_expected_lb_policy; + // maybe build the address with an authority + char *whole_uri = NULL; + GPR_ASSERT(asprintf(&whole_uri, "dns://%s/%s", + FLAGS_local_dns_server_address.c_str(), + FLAGS_target_name.c_str())); + // create resolver and resolve + grpc_resolver *resolver = grpc_resolver_create(&exec_ctx, whole_uri, NULL, + args.pollset_set, args.lock); + gpr_free(whole_uri); + grpc_closure on_resolver_result_changed; + GRPC_CLOSURE_INIT(&on_resolver_result_changed, CheckResolverResultLocked, + (void *)&args, grpc_combiner_scheduler(args.lock)); + grpc_resolver_next_locked(&exec_ctx, resolver, &args.channel_args, + &on_resolver_result_changed); + grpc_exec_ctx_flush(&exec_ctx); + PollPollsetUntilRequestDone(&args); + GRPC_RESOLVER_UNREF(&exec_ctx, resolver, NULL); + ArgsFinish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); +} + +} // namespace + +int main(int argc, char **argv) { + grpc_init(); + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_target_name == "") { + gpr_log(GPR_ERROR, "Missing target_name param."); + abort(); + } + if (FLAGS_local_dns_server_address != "") { + gpr_log(GPR_INFO, "Specifying authority in uris to: %s", + FLAGS_local_dns_server_address.c_str()); + } + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/naming/resolver_component_tests_runner.sh b/test/cpp/naming/resolver_component_tests_runner.sh new file mode 100755 index 0000000000..83b03b67a3 --- /dev/null +++ b/test/cpp/naming/resolver_component_tests_runner.sh @@ -0,0 +1,173 @@ +#!/bin/bash +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is auto-generated + +set -ex + +# all command args required in this set order +FLAGS_test_bin_path=`echo "$1" | grep '\--test_bin_path=' | cut -d "=" -f 2` +FLAGS_dns_server_bin_path=`echo "$2" | grep '\--dns_server_bin_path=' | cut -d "=" -f 2` +FLAGS_records_config_path=`echo "$3" | grep '\--records_config_path=' | cut -d "=" -f 2` +FLAGS_test_dns_server_port=`echo "$4" | grep '\--test_dns_server_port=' | cut -d "=" -f 2` + +for cmd_arg in "$FLAGS_test_bin_path" "$FLAGS_dns_server_bin_path" "$FLAGS_records_config_path" "$FLAGS_test_dns_server_port"; do + if [[ "$cmd_arg" == "" ]]; then + echo "Missing a CMD arg" && exit 1 + fi +done + +if [[ "$GRPC_DNS_RESOLVER" != "" && "$GRPC_DNS_RESOLVER" != ares ]]; then + echo "This test only works under GRPC_DNS_RESOLVER=ares. Have GRPC_DNS_RESOLVER=$GRPC_DNS_RESOLVER" && exit 1 +fi +export GRPC_DNS_RESOLVER=ares + +"$FLAGS_dns_server_bin_path" --records_config_path="$FLAGS_records_config_path" --port="$FLAGS_test_dns_server_port" 2>&1 > /dev/null & +DNS_SERVER_PID=$! +echo "Local DNS server started. PID: $DNS_SERVER_PID" + +# Health check local DNS server TCP and UDP ports +for ((i=0;i<30;i++)); +do + echo "Retry health-check DNS query to local DNS server over tcp and udp" + RETRY=0 + dig A health-check-local-dns-server-is-alive.resolver-tests.grpctestingexp. @localhost -p "$FLAGS_test_dns_server_port" +tries=1 +timeout=1 | grep '123.123.123.123' || RETRY=1 + dig A health-check-local-dns-server-is-alive.resolver-tests.grpctestingexp. @localhost -p "$FLAGS_test_dns_server_port" +tries=1 +timeout=1 +tcp | grep '123.123.123.123' || RETRY=1 + if [[ "$RETRY" == 0 ]]; then + break + fi; + sleep 0.1 +done + +if [[ $RETRY == 1 ]]; then + echo "FAILED TO START LOCAL DNS SERVER" + kill -SIGTERM $DNS_SERVER_PID + wait + exit 1 +fi + +function terminate_all { + echo "Received signal. Terminating $! and $DNS_SERVER_PID" + kill -SIGTERM $! || true + kill -SIGTERM $DNS_SERVER_PID || true + wait + exit 1 +} + +trap terminate_all SIGTERM SIGINT + +EXIT_CODE=0 +# TODO: this test should check for GCE residency and skip tests using _grpclb._tcp.* SRV records once GCE residency checks are made +# in the resolver. + +$FLAGS_test_bin_path \ + --target_name='srv-ipv4-single-target.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:1234,True' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv4-multi-target.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.5:1234,True;1.2.3.6:1234,True;1.2.3.7:1234,True' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv6-single-target.resolver-tests.grpctestingexp.' \ + --expected_addrs='[2607:f8b0:400a:801::1001]:1234,True' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv6-multi-target.resolver-tests.grpctestingexp.' \ + --expected_addrs='[2607:f8b0:400a:801::1002]:1234,True;[2607:f8b0:400a:801::1003]:1234,True;[2607:f8b0:400a:801::1004]:1234,True' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv4-simple-service-config.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:1234,True' \ + --expected_chosen_service_config='{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"SimpleService","waitForReady":true}]}]}' \ + --expected_lb_policy='round_robin' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='ipv4-no-srv-simple-service-config.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:443,False' \ + --expected_chosen_service_config='{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"NoSrvSimpleService","waitForReady":true}]}]}' \ + --expected_lb_policy='round_robin' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='ipv4-no-config-for-cpp.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:443,False' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='ipv4-cpp-config-has-zero-percentage.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:443,False' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='ipv4-second-language-is-cpp.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:443,False' \ + --expected_chosen_service_config='{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"CppService","waitForReady":true}]}]}' \ + --expected_lb_policy='round_robin' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='ipv4-config-with-percentages.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:443,False' \ + --expected_chosen_service_config='{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"AlwaysPickedService","waitForReady":true}]}]}' \ + --expected_lb_policy='round_robin' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv4-target-has-backend-and-balancer.resolver-tests.grpctestingexp.' \ + --expected_addrs='1.2.3.4:1234,True;1.2.3.4:443,False' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +$FLAGS_test_bin_path \ + --target_name='srv-ipv6-target-has-backend-and-balancer.resolver-tests.grpctestingexp.' \ + --expected_addrs='[2607:f8b0:400a:801::1002]:1234,True;[2607:f8b0:400a:801::1002]:443,False' \ + --expected_chosen_service_config='' \ + --expected_lb_policy='' \ + --local_dns_server_address=127.0.0.1:$FLAGS_test_dns_server_port & +wait $! || EXIT_CODE=1 + +kill -SIGTERM $DNS_SERVER_PID || true +wait +exit $EXIT_CODE diff --git a/test/cpp/naming/resolver_component_tests_runner_invoker.cc b/test/cpp/naming/resolver_component_tests_runner_invoker.cc new file mode 100644 index 0000000000..b14391284d --- /dev/null +++ b/test/cpp/naming/resolver_component_tests_runner_invoker.cc @@ -0,0 +1,189 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <signal.h> +#include <string.h> +#include <unistd.h> + +#include <gflags/gflags.h> +#include <string> +#include <thread> +#include <vector> + +#include "test/cpp/util/subprocess.h" +#include "test/cpp/util/test_config.h" + +extern "C" { +#include "src/core/lib/support/env.h" +#include "test/core/util/port.h" +} + +DEFINE_bool( + running_under_bazel, false, + "True if this test is running under bazel. " + "False indicates that this test is running under run_tests.py. " + "Child process test binaries are located differently based on this flag. "); + +DEFINE_string(test_bin_name, "", + "Name, without the preceding path, of the test binary"); + +DEFINE_string(grpc_test_directory_relative_to_test_srcdir, "/__main__", + "This flag only applies if runner_under_bazel is true. This " + "flag is ignored if runner_under_bazel is false. " + "Directory of the <repo-root>/test directory relative to bazel's " + "TEST_SRCDIR environment variable"); + +using grpc::SubProcess; + +static volatile sig_atomic_t abort_wait_for_child = 0; + +static void sighandler(int sig) { abort_wait_for_child = 1; } + +static void register_sighandler() { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = sighandler; + sigaction(SIGINT, &act, NULL); + sigaction(SIGTERM, &act, NULL); +} + +namespace { + +const int kTestTimeoutSeconds = 60 * 2; + +void RunSigHandlingThread(SubProcess *test_driver, gpr_mu *test_driver_mu, + gpr_cv *test_driver_cv, int *test_driver_done) { + gpr_timespec overall_deadline = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(kTestTimeoutSeconds, GPR_TIMESPAN)); + while (true) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(now, overall_deadline) > 0 || abort_wait_for_child) break; + gpr_mu_lock(test_driver_mu); + if (*test_driver_done) { + gpr_mu_unlock(test_driver_mu); + return; + } + gpr_timespec wait_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(1, GPR_TIMESPAN)); + gpr_cv_wait(test_driver_cv, test_driver_mu, wait_deadline); + gpr_mu_unlock(test_driver_mu); + } + gpr_log(GPR_DEBUG, + "Test timeout reached or received signal. Interrupting test driver " + "child process."); + test_driver->Interrupt(); + return; +} +} + +namespace grpc { + +namespace testing { + +void InvokeResolverComponentTestsRunner(std::string test_runner_bin_path, + std::string test_bin_path, + std::string dns_server_bin_path, + std::string records_config_path) { + int test_dns_server_port = grpc_pick_unused_port_or_die(); + + SubProcess *test_driver = new SubProcess( + {test_runner_bin_path, "--test_bin_path=" + test_bin_path, + "--dns_server_bin_path=" + dns_server_bin_path, + "--records_config_path=" + records_config_path, + "--test_dns_server_port=" + std::to_string(test_dns_server_port)}); + gpr_mu test_driver_mu; + gpr_mu_init(&test_driver_mu); + gpr_cv test_driver_cv; + gpr_cv_init(&test_driver_cv); + int test_driver_done = 0; + register_sighandler(); + std::thread sig_handling_thread(RunSigHandlingThread, test_driver, + &test_driver_mu, &test_driver_cv, + &test_driver_done); + int status = test_driver->Join(); + if (WIFEXITED(status)) { + if (WEXITSTATUS(status)) { + gpr_log(GPR_INFO, + "Resolver component test test-runner exited with code %d", + WEXITSTATUS(status)); + abort(); + } + } else if (WIFSIGNALED(status)) { + gpr_log(GPR_INFO, + "Resolver component test test-runner ended from signal %d", + WTERMSIG(status)); + abort(); + } else { + gpr_log(GPR_INFO, + "Resolver component test test-runner ended with unknown status %d", + status); + abort(); + } + gpr_mu_lock(&test_driver_mu); + test_driver_done = 1; + gpr_cv_signal(&test_driver_cv); + gpr_mu_unlock(&test_driver_mu); + sig_handling_thread.join(); + delete test_driver; + gpr_mu_destroy(&test_driver_mu); + gpr_cv_destroy(&test_driver_cv); +} + +} // namespace testing + +} // namespace grpc + +int main(int argc, char **argv) { + grpc::testing::InitTest(&argc, &argv, true); + grpc_init(); + GPR_ASSERT(FLAGS_test_bin_name != ""); + std::string my_bin = argv[0]; + if (FLAGS_running_under_bazel) { + GPR_ASSERT(FLAGS_grpc_test_directory_relative_to_test_srcdir != ""); + // Use bazel's TEST_SRCDIR environment variable to locate the "test data" + // binaries. + std::string const bin_dir = + gpr_getenv("TEST_SRCDIR") + + FLAGS_grpc_test_directory_relative_to_test_srcdir + + std::string("/test/cpp/naming"); + // Invoke bazel's executeable links to the .sh and .py scripts (don't use + // the .sh and .py suffixes) to make + // sure that we're using bazel's test environment. + grpc::testing::InvokeResolverComponentTestsRunner( + bin_dir + "/resolver_component_tests_runner", + bin_dir + "/" + FLAGS_test_bin_name, bin_dir + "/test_dns_server", + bin_dir + "/resolver_test_record_groups.yaml"); + } else { + // Get the current binary's directory relative to repo root to invoke the + // correct build config (asan/tsan/dbg, etc.). + std::string const bin_dir = my_bin.substr(0, my_bin.rfind('/')); + // Invoke the .sh and .py scripts directly where they are in source code. + grpc::testing::InvokeResolverComponentTestsRunner( + "test/cpp/naming/resolver_component_tests_runner.sh", + bin_dir + "/" + FLAGS_test_bin_name, + "test/cpp/naming/test_dns_server.py", + "test/cpp/naming/resolver_test_record_groups.yaml"); + } + grpc_shutdown(); + return 0; +} diff --git a/test/cpp/naming/resolver_test_record_groups.yaml b/test/cpp/naming/resolver_test_record_groups.yaml new file mode 100644 index 0000000000..67c611d831 --- /dev/null +++ b/test/cpp/naming/resolver_test_record_groups.yaml @@ -0,0 +1,155 @@ +resolver_component_tests_common_zone_name: resolver-tests.grpctestingexp. +resolver_component_tests: +- expected_addrs: + - {address: '1.2.3.4:1234', is_balancer: true} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv4-single-target + records: + _grpclb._tcp.srv-ipv4-single-target: + - {TTL: '2100', data: 0 0 1234 ipv4-single-target, type: SRV} + ipv4-single-target: + - {TTL: '2100', data: 1.2.3.4, type: A} +- expected_addrs: + - {address: '1.2.3.5:1234', is_balancer: true} + - {address: '1.2.3.6:1234', is_balancer: true} + - {address: '1.2.3.7:1234', is_balancer: true} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv4-multi-target + records: + _grpclb._tcp.srv-ipv4-multi-target: + - {TTL: '2100', data: 0 0 1234 ipv4-multi-target, type: SRV} + ipv4-multi-target: + - {TTL: '2100', data: 1.2.3.5, type: A} + - {TTL: '2100', data: 1.2.3.6, type: A} + - {TTL: '2100', data: 1.2.3.7, type: A} +- expected_addrs: + - {address: '[2607:f8b0:400a:801::1001]:1234', is_balancer: true} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv6-single-target + records: + _grpclb._tcp.srv-ipv6-single-target: + - {TTL: '2100', data: 0 0 1234 ipv6-single-target, type: SRV} + ipv6-single-target: + - {TTL: '2100', data: '2607:f8b0:400a:801::1001', type: AAAA} +- expected_addrs: + - {address: '[2607:f8b0:400a:801::1002]:1234', is_balancer: true} + - {address: '[2607:f8b0:400a:801::1003]:1234', is_balancer: true} + - {address: '[2607:f8b0:400a:801::1004]:1234', is_balancer: true} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv6-multi-target + records: + _grpclb._tcp.srv-ipv6-multi-target: + - {TTL: '2100', data: 0 0 1234 ipv6-multi-target, type: SRV} + ipv6-multi-target: + - {TTL: '2100', data: '2607:f8b0:400a:801::1002', type: AAAA} + - {TTL: '2100', data: '2607:f8b0:400a:801::1003', type: AAAA} + - {TTL: '2100', data: '2607:f8b0:400a:801::1004', type: AAAA} +- expected_addrs: + - {address: '1.2.3.4:1234', is_balancer: true} + expected_chosen_service_config: '{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"SimpleService","waitForReady":true}]}]}' + expected_lb_policy: round_robin + record_to_resolve: srv-ipv4-simple-service-config + records: + _grpclb._tcp.srv-ipv4-simple-service-config: + - {TTL: '2100', data: 0 0 1234 ipv4-simple-service-config, type: SRV} + ipv4-simple-service-config: + - {TTL: '2100', data: 1.2.3.4, type: A} + srv-ipv4-simple-service-config: + - {TTL: '2100', data: 'grpc_config=[{"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"SimpleService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: '{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"NoSrvSimpleService","waitForReady":true}]}]}' + expected_lb_policy: round_robin + record_to_resolve: ipv4-no-srv-simple-service-config + records: + ipv4-no-srv-simple-service-config: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"NoSrvSimpleService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: ipv4-no-config-for-cpp + records: + ipv4-no-config-for-cpp: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"clientLanguage":["python"],"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"PythonService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: ipv4-cpp-config-has-zero-percentage + records: + ipv4-cpp-config-has-zero-percentage: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"percentage":0,"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"CppService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: '{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"CppService","waitForReady":true}]}]}' + expected_lb_policy: round_robin + record_to_resolve: ipv4-second-language-is-cpp + records: + ipv4-second-language-is-cpp: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"clientLanguage":["go"],"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"GoService","waitForReady":true}]}]}},{"clientLanguage":["c++"],"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"CppService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: '{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"AlwaysPickedService","waitForReady":true}]}]}' + expected_lb_policy: round_robin + record_to_resolve: ipv4-config-with-percentages + records: + ipv4-config-with-percentages: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"percentage":0,"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"NeverPickedService","waitForReady":true}]}]}},{"percentage":100,"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"AlwaysPickedService","waitForReady":true}]}]}}]', + type: TXT} +- expected_addrs: + - {address: '1.2.3.4:1234', is_balancer: true} + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv4-target-has-backend-and-balancer + records: + _grpclb._tcp.srv-ipv4-target-has-backend-and-balancer: + - {TTL: '2100', data: 0 0 1234 balancer-for-ipv4-has-backend-and-balancer, type: SRV} + balancer-for-ipv4-has-backend-and-balancer: + - {TTL: '2100', data: 1.2.3.4, type: A} + srv-ipv4-target-has-backend-and-balancer: + - {TTL: '2100', data: 1.2.3.4, type: A} +- expected_addrs: + - {address: '[2607:f8b0:400a:801::1002]:1234', is_balancer: true} + - {address: '[2607:f8b0:400a:801::1002]:443', is_balancer: false} + expected_chosen_service_config: null + expected_lb_policy: null + record_to_resolve: srv-ipv6-target-has-backend-and-balancer + records: + _grpclb._tcp.srv-ipv6-target-has-backend-and-balancer: + - {TTL: '2100', data: 0 0 1234 balancer-for-ipv6-has-backend-and-balancer, type: SRV} + balancer-for-ipv6-has-backend-and-balancer: + - {TTL: '2100', data: '2607:f8b0:400a:801::1002', type: AAAA} + srv-ipv6-target-has-backend-and-balancer: + - {TTL: '2100', data: '2607:f8b0:400a:801::1002', type: AAAA} + +resolver_component_tests_TODO: +- 'TODO: enable this large-txt-record test once working. (it is much longer than 512 + bytes, likely to cause use of TCP even if max payload for UDP is changed somehow, + e.g. via notes in RFC 2671)' +- expected_addrs: + - {address: '1.2.3.4:443', is_balancer: false} + expected_chosen_service_config: '{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwo","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooThree","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooFour","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooFive","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooSix","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooSeven","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooEight","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooNine","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTen","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooEleven","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]}]}' + expected_lb_policy: null + record_to_resolve: srv-ipv6-target-has-backend-and-balancer + record_to_resolve: ipv4-config-causing-fallback-to-tcp + records: + ipv4-config-causing-fallback-to-tcp: + - {TTL: '2100', data: 1.2.3.4, type: A} + - {TTL: '2100', data: 'grpc_config=[{"serviceConfig":{"loadBalancingPolicy":"round_robin","methodConfig":[{"name":[{"method":"Foo","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwo","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooThree","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooFour","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooFive","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooSix","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooSeven","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooEight","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooNine","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTen","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooEleven","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]},{"name":[{"method":"FooTwelve","service":"SimpleService","waitForReady":true}]}]}}]', + type: TXT} diff --git a/test/cpp/naming/test_dns_server.py b/test/cpp/naming/test_dns_server.py new file mode 100755 index 0000000000..9d4b89cffb --- /dev/null +++ b/test/cpp/naming/test_dns_server.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python2.7 +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Starts a local DNS server for use in tests""" + +import argparse +import sys +import yaml +import signal +import os + +import twisted +import twisted.internet +import twisted.internet.reactor +import twisted.internet.threads +import twisted.internet.defer +import twisted.internet.protocol +import twisted.names +import twisted.names.client +import twisted.names.dns +import twisted.names.server +from twisted.names import client, server, common, authority, dns +import argparse + +_SERVER_HEALTH_CHECK_RECORD_NAME = 'health-check-local-dns-server-is-alive.resolver-tests.grpctestingexp' # missing end '.' for twisted syntax +_SERVER_HEALTH_CHECK_RECORD_DATA = '123.123.123.123' + +class NoFileAuthority(authority.FileAuthority): + def __init__(self, soa, records): + # skip FileAuthority + common.ResolverBase.__init__(self) + self.soa = soa + self.records = records + +def start_local_dns_server(args): + all_records = {} + def _push_record(name, r): + print('pushing record: |%s|' % name) + if all_records.get(name) is not None: + all_records[name].append(r) + return + all_records[name] = [r] + + def _maybe_split_up_txt_data(name, txt_data, r_ttl): + start = 0 + txt_data_list = [] + while len(txt_data[start:]) > 0: + next_read = len(txt_data[start:]) + if next_read > 255: + next_read = 255 + txt_data_list.append(txt_data[start:start+next_read]) + start += next_read + _push_record(name, dns.Record_TXT(*txt_data_list, ttl=r_ttl)) + + with open(args.records_config_path) as config: + test_records_config = yaml.load(config) + common_zone_name = test_records_config['resolver_component_tests_common_zone_name'] + for group in test_records_config['resolver_component_tests']: + for name in group['records'].keys(): + for record in group['records'][name]: + r_type = record['type'] + r_data = record['data'] + r_ttl = int(record['TTL']) + record_full_name = '%s.%s' % (name, common_zone_name) + assert record_full_name[-1] == '.' + record_full_name = record_full_name[:-1] + if r_type == 'A': + _push_record(record_full_name, dns.Record_A(r_data, ttl=r_ttl)) + if r_type == 'AAAA': + _push_record(record_full_name, dns.Record_AAAA(r_data, ttl=r_ttl)) + if r_type == 'SRV': + p, w, port, target = r_data.split(' ') + p = int(p) + w = int(w) + port = int(port) + target_full_name = '%s.%s' % (target, common_zone_name) + r_data = '%s %s %s %s' % (p, w, port, target_full_name) + _push_record(record_full_name, dns.Record_SRV(p, w, port, target_full_name, ttl=r_ttl)) + if r_type == 'TXT': + _maybe_split_up_txt_data(record_full_name, r_data, r_ttl) + # Server health check record + _push_record(_SERVER_HEALTH_CHECK_RECORD_NAME, dns.Record_A(_SERVER_HEALTH_CHECK_RECORD_DATA, ttl=0)) + soa_record = dns.Record_SOA(mname = common_zone_name) + test_domain_com = NoFileAuthority( + soa = (common_zone_name, soa_record), + records = all_records, + ) + server = twisted.names.server.DNSServerFactory( + authorities=[test_domain_com], verbose=2) + server.noisy = 2 + twisted.internet.reactor.listenTCP(args.port, server) + dns_proto = twisted.names.dns.DNSDatagramProtocol(server) + dns_proto.noisy = 2 + twisted.internet.reactor.listenUDP(args.port, dns_proto) + print('starting local dns server on 127.0.0.1:%s' % args.port) + print('starting twisted.internet.reactor') + twisted.internet.reactor.suggestThreadPoolSize(1) + twisted.internet.reactor.run() + +def _quit_on_signal(signum, _frame): + print('Received SIGNAL %d. Quitting with exit code 0' % signum) + twisted.internet.reactor.stop() + sys.stdout.flush() + sys.exit(0) + +def main(): + argp = argparse.ArgumentParser(description='Local DNS Server for resolver tests') + argp.add_argument('-p', '--port', default=None, type=int, + help='Port for DNS server to listen on for TCP and UDP.') + argp.add_argument('-r', '--records_config_path', default=None, type=str, + help=('Directory of resolver_test_record_groups.yaml file. ' + 'Defauls to path needed when the test is invoked as part of run_tests.py.')) + args = argp.parse_args() + signal.signal(signal.SIGALRM, _quit_on_signal) + signal.signal(signal.SIGTERM, _quit_on_signal) + signal.signal(signal.SIGINT, _quit_on_signal) + # Prevent zombies. Tests that use this server are short-lived. + signal.alarm(2 * 60) + start_local_dns_server(args) + +if __name__ == '__main__': + main() diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 912c871482..1a4438047d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -56,11 +56,6 @@ class ClientRpcContext { } virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; - void lock() { mu_.lock(); } - void unlock() { mu_.unlock(); } - - private: - std::mutex mu_; }; template <class RequestType, class ResponseType> @@ -73,7 +68,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> - start_req, + prepare_req, std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done) : context_(), stub_(stub), @@ -83,7 +78,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_(State::READY), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -92,7 +87,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); - response_reader_ = start_req_(stub_, &context_, req_, cq_); + response_reader_ = prepare_req_(stub_, &context_, req_, cq_); + response_reader_->StartCall(); next_state_ = State::RESP_DONE; response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); @@ -111,8 +107,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, - start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + prepare_req_, callback_); clone->StartInternal(cq); } @@ -130,7 +125,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> - start_req_; + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> @@ -252,29 +247,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { // this thread isn't supposed to shut down std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { - // We want to delete the context. However, it is possible that - // another thread that just initiated an action on this - // context still has its lock even though the action on the - // context has completed. To delay for that, just grab the - // lock for serialization. Take a new scope. - { std::lock_guard<ClientRpcContext> lctx(*ctx); } delete ctx; return true; } - bool del = false; - - // Create a new scope for a lock_guard'ed region - { - std::lock_guard<ClientRpcContext> lctx(*ctx); - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - // set the old version to delete - del = true; - } - } - if (del) { + if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); delete ctx; } return true; @@ -311,15 +290,15 @@ class AsyncUnaryClient final entry->set_status(s.error_code()); } static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> - StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { - return stub->AsyncUnaryCall(ctx, request, cq); + PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { + return stub->PrepareAsyncUnaryCall(ctx, request, cq); }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, std::function<gpr_timespec()> next_issue, const SimpleRequest& req) { return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncUnaryClient::StartReq, + stub, req, next_issue, AsyncUnaryClient::PrepareReq, AsyncUnaryClient::CheckDone); } }; @@ -332,9 +311,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> - start_req, + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -344,7 +322,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq, config.messages_per_stream()); @@ -407,8 +385,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq, messages_per_stream_); } @@ -432,10 +409,10 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { State next_state_; std::function<void(grpc::Status, ResponseType*)> callback_; std::function<gpr_timespec()> next_issue_; - std::function<std::unique_ptr< - grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + std::function< + std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -449,8 +426,9 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { cq_ = cq; messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + stream_ = prepare_req_(stub_, &context_, cq); next_state_ = State::STREAM_IDLE; - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -469,9 +447,9 @@ class AsyncStreamingPingPongClient final static void CheckDone(grpc::Status s, SimpleResponse* response) {} static std::unique_ptr< grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> - StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingCall(ctx, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -479,7 +457,7 @@ class AsyncStreamingPingPongClient final const SimpleRequest& req) { return new ClientRpcContextStreamingPingPongImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingPingPongClient::StartReq, + stub, req, next_issue, AsyncStreamingPingPongClient::PrepareReq, AsyncStreamingPingPongClient::CheckDone); } }; @@ -492,8 +470,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, - CompletionQueue*, void*)> - start_req, + CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -503,7 +481,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromClientImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -546,8 +524,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromClientImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq); } @@ -570,17 +547,17 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_; void StartInternal(CompletionQueue* cq) { cq_ = cq; - stream_ = start_req_(stub_, &context_, &response_, cq, - ClientRpcContext::tag(this)); + stream_ = prepare_req_(stub_, &context_, &response_, cq); next_state_ = State::STREAM_IDLE; + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -597,10 +574,10 @@ class AsyncStreamingFromClientClient final private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} - static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq( + static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq( BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - SimpleResponse* resp, CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag); + SimpleResponse* resp, CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -608,7 +585,7 @@ class AsyncStreamingFromClientClient final const SimpleRequest& req) { return new ClientRpcContextStreamingFromClientImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingFromClientClient::StartReq, + stub, req, next_issue, AsyncStreamingFromClientClient::PrepareReq, AsyncStreamingFromClientClient::CheckDone); } }; @@ -621,8 +598,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*, void*)> - start_req, + CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ResponseType*)> on_done) : context_(), stub_(stub), @@ -632,7 +609,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromServerImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq); @@ -664,8 +641,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromServerImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq); } @@ -682,8 +658,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_; @@ -691,9 +667,9 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { void StartInternal(CompletionQueue* cq) { // TODO(vjpai): Add support to rate-pace this cq_ = cq; + stream_ = prepare_req_(stub_, &context_, req_, cq); next_state_ = State::STREAM_IDLE; - stream_ = - start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -710,10 +686,10 @@ class AsyncStreamingFromServerClient final private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} - static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq( + static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq( BenchmarkService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& req, CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag); + const SimpleRequest& req, CompletionQueue* cq) { + auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, @@ -721,7 +697,7 @@ class AsyncStreamingFromServerClient final const SimpleRequest& req) { return new ClientRpcContextStreamingFromServerImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingFromServerClient::StartReq, + stub, req, next_issue, AsyncStreamingFromServerClient::PrepareReq, AsyncStreamingFromServerClient::CheckDone); } }; @@ -733,8 +709,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> - start_req, + const grpc::string& method_name, CompletionQueue*)> + prepare_req, std::function<void(grpc::Status, ByteBuffer*)> on_done) : context_(), stub_(stub), @@ -744,7 +720,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req) {} + prepare_req_(prepare_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq, config.messages_per_stream()); @@ -807,8 +783,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { } void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextGenericStreamingImpl( - stub_, req_, next_issue_, start_req_, callback_); - std::lock_guard<ClientRpcContext> lclone(*clone); + stub_, req_, next_issue_, prepare_req_, callback_); clone->StartInternal(cq, messages_per_stream_); } @@ -834,8 +809,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*)> + prepare_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; @@ -850,9 +825,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { "/grpc.testing.BenchmarkService/StreamingCall"); messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + stream_ = prepare_req_(stub_, &context_, kMethodName, cq); next_state_ = State::STREAM_IDLE; - stream_ = start_req_(stub_, &context_, kMethodName, cq, - ClientRpcContext::tag(this)); + stream_->StartCall(ClientRpcContext::tag(this)); } }; @@ -874,17 +849,17 @@ class GenericAsyncStreamingClient final private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} - static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( + static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq( grpc::GenericStub* stub, grpc::ClientContext* ctx, - const grpc::string& method_name, CompletionQueue* cq, void* tag) { - auto stream = stub->Call(ctx, method_name, cq, tag); + const grpc::string& method_name, CompletionQueue* cq) { + auto stream = stub->PrepareCall(ctx, method_name, cq); return stream; }; static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, std::function<gpr_timespec()> next_issue, const ByteBuffer& req) { return new ClientRpcContextGenericStreamingImpl( - stub, req, next_issue, GenericAsyncStreamingClient::StartReq, + stub, req, next_issue, GenericAsyncStreamingClient::PrepareReq, GenericAsyncStreamingClient::CheckDone); } }; |