diff options
author | Dan Born <dborn+github@google.com> | 2016-01-20 14:16:29 -0800 |
---|---|---|
committer | Dan Born <dborn+github@google.com> | 2016-01-20 14:16:29 -0800 |
commit | beeb4c2ad75533848ac0fca545a3b4785d8fce57 (patch) | |
tree | eed3a91f80967ab90ae4cbc559503015bd3a0483 /test/cpp/qps | |
parent | b13a69da41ddad7880f409c53d1f55982ee79ac5 (diff) | |
parent | 4c3c397bbea2acf023950f7470fe36d2e7322cc0 (diff) |
Merge branch 'master' into tcp_listener
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.h | 168 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 156 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/generic_async_streaming_ping_pong_test.cc | 82 | ||||
-rw-r--r-- | test/cpp/qps/histogram.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/perf_db.proto | 71 | ||||
-rw-r--r-- | test/cpp/qps/perf_db_client.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 18 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 31 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.h | 4 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 9 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 151 | ||||
-rw-r--r-- | test/cpp/qps/server_sync.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 5 |
16 files changed, 504 insertions, 229 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index b24a90adac..97487fd0b2 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,12 +37,15 @@ #include <condition_variable> #include <mutex> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/support/slice.h> + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/benchmarks/payloads.grpc.pb.h" -#include "test/proto/benchmarks/services.grpc.pb.h" +#include "src/proto/grpc/testing/payloads.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { @@ -66,37 +69,64 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point<grpc_time_source> grpc_time; -class Client { +template <class RequestType> +class ClientRequestCreator { public: - explicit Client(const ClientConfig& config) - : channels_(config.client_channels()), - timer_(new Timer), - interarrival_timer_() { - for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); - } - if (config.payload_config().has_bytebuf_params()) { - GPR_ASSERT(false); // not yet implemented - } else if (config.payload_config().has_simple_params()) { - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size( - config.payload_config().simple_params().resp_size()); - request_.mutable_payload()->set_type( + ClientRequestCreator(RequestType* req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); + } +}; + +template <> +class ClientRequestCreator<SimpleRequest> { + public: + ClientRequestCreator(SimpleRequest* req, + const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); - int size = config.payload_config().simple_params().req_size(); + int size = payload_config.simple_params().req_size(); std::unique_ptr<char[]> body(new char[size]); - request_.mutable_payload()->set_body(body.get(), size); - } else if (config.payload_config().has_complex_params()) { - GPR_ASSERT(false); // not yet implemented + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization } else { // default should be simple proto without payloads - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size(0); - request_.mutable_payload()->set_type( + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); } } +}; + +template <> +class ClientRequestCreator<ByteBuffer> { + public: + ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + std::unique_ptr<char[]> buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + *req = ByteBuffer(&slice, 1); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } + } +}; + +class Client { + public: + Client() : timer_(new Timer), interarrival_timer_() {} virtual ~Client() {} ClientStats Mark(bool reset) { @@ -134,37 +164,8 @@ class Client { } protected: - SimpleRequest request_; bool closed_loop_; - class ClientChannelInfo { - public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector - channel_ = CreateTestChannel( - target, config.security_params().server_host_override(), - config.has_security_params(), - !config.security_params().use_test_ca()); - stub_ = BenchmarkService::NewStub(channel_); - } - Channel* get_channel() { return channel_.get(); } - BenchmarkService::Stub* get_stub() { return stub_.get(); } - - private: - std::shared_ptr<Channel> channel_; - std::unique_ptr<BenchmarkService::Stub> stub_; - }; - std::vector<ClientChannelInfo> channels_; - void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -295,8 +296,6 @@ class Client { } } - BenchmarkService::Stub* stub_; - ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; @@ -314,11 +313,66 @@ class Client { std::vector<grpc_time> next_time_; }; +template <class StubType, class RequestType> +class ClientImpl : public Client { + public: + ClientImpl(const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : channels_(config.client_channels()), create_stub_(create_stub) { + for (int i = 0; i < config.client_channels(); i++) { + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config, create_stub_); + } + + ClientRequestCreator<RequestType> create_req(&request_, + config.payload_config()); + } + virtual ~ClientImpl() {} + + protected: + RequestType request_; + + class ClientChannelInfo { + public: + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = create_stub(channel_); + } + Channel* get_channel() { return channel_.get(); } + StubType* get_stub() { return stub_.get(); } + + private: + std::shared_ptr<Channel> channel_; + std::unique_ptr<StubType> stub_; + }; + std::vector<ClientChannelInfo> channels_; + std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> + create_stub_; +}; + std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousStreamingClient( const ClientConfig& args); std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9594179822..f270cd0987 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,21 +37,22 @@ #include <list> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> +#include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> #include <grpc/support/histogram.h> #include <grpc/support/log.h> -#include <gflags/gflags.h> -#include <grpc++/client_context.h> -#include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" +#include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/benchmarks/services.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { @@ -147,13 +148,22 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { typedef std::forward_list<ClientRpcContext*> context_list; -class AsyncClient : public Client { +template <class StubType, class RequestType> +class AsyncClient : public ClientImpl<StubType, RequestType> { + // Specify which protected members we are using since there is no + // member name resolution until the template types are fully resolved public: - explicit AsyncClient( - const ClientConfig& config, - std::function<ClientRpcContext*(int, BenchmarkService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config), + using Client::SetupLoadTest; + using Client::NextIssueTime; + using Client::closed_loop_; + using ClientImpl<StubType, RequestType>::channels_; + using ClientImpl<StubType, RequestType>::request_; + AsyncClient(const ClientConfig& config, + std::function<ClientRpcContext*(int, StubType*, + const RequestType&)> setup_ctx, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : ClientImpl<StubType, RequestType>(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -343,10 +353,16 @@ class AsyncClient : public Client { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : public AsyncClient { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class AsyncUnaryClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -437,10 +453,11 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_; }; -class AsyncStreamingClient GRPC_FINAL : public AsyncClient { +class AsyncStreamingClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -467,12 +484,119 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { } }; +class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { + public: + ClientRpcContextGenericStreamingImpl( + int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, + const grpc::string& method_name, CompletionQueue*, void*)> start_req, + std::function<void(grpc::Status, ByteBuffer*)> on_done) + : ClientRpcContext(channel_id), + context_(), + stub_(stub), + req_(req), + response_(), + next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), + callback_(on_done), + start_req_(start_req), + start_(Timer::Now()) {} + ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} + bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + return (this->*next_state_)(ok, hist); + } + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); + } + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + const grpc::string kMethodName( + "/grpc.testing.BenchmarkService/StreamingCall"); + stream_ = start_req_(stub_, &context_, kMethodName, cq, + ClientRpcContext::tag(this)); + } + + private: + bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } + bool StartWrite(bool ok) { + if (!ok) { + return (false); + } + start_ = Timer::Now(); + next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + } + bool WriteDone(bool ok, Histogram*) { + if (!ok) { + return (false); + } + next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + } + bool ReadDone(bool ok, Histogram* hist) { + hist->Add((Timer::Now() - start_) * 1e9); + return StartWrite(ok); + } + grpc::ClientContext context_; + grpc::GenericStub* stub_; + ByteBuffer req_; + ByteBuffer response_; + bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); + std::function<void(grpc::Status, ByteBuffer*)> callback_; + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, + CompletionQueue*, void*)> start_req_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; +}; + +static std::unique_ptr<grpc::GenericStub> GenericStubCreator( + std::shared_ptr<Channel> ch) { + return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch)); +} + +class GenericAsyncStreamingClient GRPC_FINAL + : public AsyncClient<grpc::GenericStub, ByteBuffer> { + public: + explicit GenericAsyncStreamingClient(const ClientConfig& config) + : AsyncClient(config, SetupCtx, GenericStubCreator) { + // async streaming currently only supports closed loop + GPR_ASSERT(closed_loop_); + + StartThreads(config.async_client_threads()); + } + + ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + + private: + static void CheckDone(grpc::Status s, ByteBuffer* response) {} + static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( + grpc::GenericStub* stub, grpc::ClientContext* ctx, + const grpc::string& method_name, CompletionQueue* cq, void* tag) { + auto stream = stub->Call(ctx, method_name, cq, tag); + return stream; + }; + static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, + const ByteBuffer& req) { + return new ClientRpcContextGenericStreamingImpl( + channel_id, stub, req, GenericAsyncStreamingClient::StartReq, + GenericAsyncStreamingClient::CheckDone); + } +}; + std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { return std::unique_ptr<Client>(new AsyncUnaryClient(args)); } std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr<Client>(new AsyncStreamingClient(args)); } +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args) { + return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args)); +} } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 10d680860a..92fbf240ce 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,38 +35,44 @@ #include <chrono> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> #include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include <grpc++/client_context.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> #include <gtest/gtest.h> -#include "test/cpp/util/create_test_channel.h" +#include "src/core/profiling/timers.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" -#include "test/proto/benchmarks/services.grpc.pb.h" - -#include "src/core/profiling/timers.h" namespace grpc { namespace testing { -class SynchronousClient : public Client { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class SynchronousClient + : public ClientImpl<BenchmarkService::Stub, SimpleRequest> { public: - SynchronousClient(const ClientConfig& config) : Client(config) { + SynchronousClient(const ClientConfig& config) + : ClientImpl<BenchmarkService::Stub, SimpleRequest>( + config, BenchmarkStubCreator) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 20dc65046b..acb265b308 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -48,7 +48,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" -#include "test/proto/benchmarks/services.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" using std::list; using std::thread; diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 50bf17ceab..2a7cf805e5 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -37,7 +37,7 @@ #include <memory> #include "test/cpp/qps/histogram.h" -#include "test/proto/benchmarks/control.grpc.pb.h" +#include "src/proto/grpc/testing/control.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc new file mode 100644 index 0000000000..2b2e1c820f --- /dev/null +++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc @@ -0,0 +1,82 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 10; + +static void RunGenericAsyncStreamingPingPong() { + gpr_log(GPR_INFO, "Running Generic Async Streaming Ping Pong"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_outstanding_rpcs_per_channel(1); + client_config.set_client_channels(1); + client_config.set_async_client_threads(1); + client_config.set_rpc_type(STREAMING); + client_config.mutable_load_params()->mutable_closed_loop(); + auto bbuf = client_config.mutable_payload_config()->mutable_bytebuf_params(); + bbuf->set_resp_size(0); + bbuf->set_req_size(0); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_host("localhost"); + server_config.set_async_server_threads(1); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPS(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc::testing::RunGenericAsyncStreamingPingPong(); + return 0; +} diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index 4161eac826..acb415f0a1 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -35,7 +35,7 @@ #define TEST_QPS_HISTOGRAM_H #include <grpc/support/histogram.h> -#include "test/proto/benchmarks/stats.grpc.pb.h" +#include "src/proto/grpc/testing/stats.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto deleted file mode 100644 index 8a691ddded..0000000000 --- a/test/cpp/qps/perf_db.proto +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -syntax = "proto3"; - -import "test/proto/benchmarks/control.proto"; - -package grpc.testing; - -service PerfDbTransfer { - // Sends client info - rpc RecordSingleClientData(SingleUserRecordRequest) - returns (SingleUserRecordReply) { - } -} - -// Metrics to be stored -message Metrics { - double qps = 1; - double qps_per_core = 2; - double perc_lat_50 = 3; - double perc_lat_90 = 4; - double perc_lat_95 = 5; - double perc_lat_99 = 6; - double perc_lat_99_point_9 = 7; - double server_system_time = 8; - double server_user_time = 9; - double client_system_time = 10; - double client_user_time = 11; -} - -// Request for storing a single user's data -message SingleUserRecordRequest { - string hashed_id = 1; - string test_name = 2; - string sys_info = 3; - string tag = 4; - Metrics metrics = 5; - ClientConfig client_config = 6; - ServerConfig server_config = 7; -} - -// Reply to request for storing single user's data -message SingleUserRecordReply { -} diff --git a/test/cpp/qps/perf_db_client.h b/test/cpp/qps/perf_db_client.h index 72ebe79c3c..ece020aa9b 100644 --- a/test/cpp/qps/perf_db_client.h +++ b/test/cpp/qps/perf_db_client.h @@ -42,7 +42,7 @@ #include <grpc++/client_context.h> #include <grpc++/create_channel.h> #include <grpc++/security/credentials.h> -#include "test/cpp/qps/perf_db.grpc.pb.h" +#include "src/proto/grpc/testing/perf_db.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index c7096391e6..9816a09592 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -62,6 +62,8 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(simple_req_size, -1, "Simple proto request payload size"); DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size"); +DEFINE_int32(bbuf_req_size, -1, "Byte-buffer request payload size"); +DEFINE_int32(bbuf_resp_size, -1, "Byte-buffer response payload size"); DEFINE_string(client_type, "SYNC_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); @@ -109,6 +111,13 @@ static void QpsDriver() { if (FLAGS_simple_req_size >= 0) { params->set_req_size(FLAGS_simple_req_size); } + } else if (FLAGS_bbuf_resp_size >= 0) { + auto params = + client_config.mutable_payload_config()->mutable_bytebuf_params(); + params->set_resp_size(FLAGS_bbuf_resp_size); + if (FLAGS_bbuf_req_size >= 0) { + params->set_req_size(FLAGS_bbuf_req_size); + } } else { // set a reasonable default: proto but no payload client_config.mutable_payload_config()->mutable_simple_params(); @@ -156,6 +165,13 @@ static void QpsDriver() { server_config.mutable_security_params()->CopyFrom(security); } + // Make sure that if we are performing a generic (bytebuf) test + // that we are also using async streaming + GPR_ASSERT(!client_config.payload_config().has_bytebuf_params() || + (client_config.client_type() == ASYNC_CLIENT && + client_config.rpc_type() == STREAMING && + server_config.server_type() == ASYNC_SERVER)); + const auto result = RunScenario( client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index dc59eab7ef..c0276d05b3 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,26 +36,26 @@ #include <cassert> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> +#include <grpc++/client_context.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> -#include <grpc/support/log.h> #include <grpc/support/host_port.h> -#include <grpc++/client_context.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/security/server_credentials.h> +#include <grpc/support/log.h> +#include "src/proto/grpc/testing/services.pb.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/benchmarks/services.pb.h" namespace grpc { namespace testing { @@ -69,7 +69,9 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { case ClientType::ASYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) ? CreateAsyncUnaryClient(config) - : CreateAsyncStreamingClient(config); + : (config.payload_config().has_bytebuf_params() + ? CreateGenericAsyncStreamingClient(config) + : CreateAsyncStreamingClient(config)); default: abort(); } @@ -95,7 +97,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { public: - explicit WorkerServiceImpl() : acquired_(false) {} + explicit WorkerServiceImpl(int server_port) + : acquired_(false), server_port_(server_port) {} Status RunClient(ServerContext* ctx, ServerReaderWriter<ClientStatus, ClientArgs>* stream) @@ -194,6 +197,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (!args.has_setup()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + if (server_port_ != 0) { + args.mutable_setup()->set_port(server_port_); + } auto server = CreateServer(args.setup()); if (!server) { return Status(StatusCode::INVALID_ARGUMENT, ""); @@ -217,10 +223,11 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { std::mutex mu_; bool acquired_; + int server_port_; }; -QpsWorker::QpsWorker(int driver_port) { - impl_.reset(new WorkerServiceImpl()); +QpsWorker::QpsWorker(int driver_port, int server_port) { + impl_.reset(new WorkerServiceImpl(server_port)); char* server_address = NULL; gpr_join_host_port(&server_address, "::", driver_port); diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h index 0db88ad3d1..27de69fa65 100644 --- a/test/cpp/qps/qps_worker.h +++ b/test/cpp/qps/qps_worker.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ class WorkerServiceImpl; class QpsWorker { public: - explicit QpsWorker(int driver_port); + explicit QpsWorker(int driver_port, int server_port = 0); ~QpsWorker(); private: diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 6e81edc8ff..32a3e85026 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -40,8 +40,8 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/cpp/qps/timer.h" -#include "test/proto/messages.grpc.pb.h" -#include "test/proto/benchmarks/control.grpc.pb.h" +#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/control.grpc.pb.h" namespace grpc { namespace testing { @@ -75,12 +75,11 @@ class Server { } static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; // TODO(yangg): Support UNCOMPRESSABLE payload. if (type != PayloadType::COMPRESSABLE) { return false; } - payload->set_type(response_type); + payload->set_type(type); std::unique_ptr<char[]> body(new char[size]()); payload->set_body(body.get(), size); return true; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c151918ce4..d530dac86b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,26 +38,42 @@ #include <thread> #include <gflags/gflags.h> +#include <grpc++/generic/async_generic_service.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/support/config.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <grpc++/support/config.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/server_context.h> -#include <grpc++/security/server_credentials.h> #include <gtest/gtest.h> #include "test/cpp/qps/server.h" -#include "test/proto/benchmarks/services.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { +template <class RequestType, class ResponseType, class ServiceType, + class ServerContextType> class AsyncQpsServerTest : public Server { public: - explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) { + AsyncQpsServerTest( + const ServerConfig &config, + std::function<void(ServerBuilder *, ServiceType *)> register_service, + std::function<void(ServiceType *, ServerContextType *, RequestType *, + ServerAsyncResponseWriter<ResponseType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_unary_function, + std::function<void(ServiceType *, ServerContextType *, + ServerAsyncReaderWriter<ResponseType, RequestType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_function, + std::function<grpc::Status(const PayloadConfig &, const RequestType *, + ResponseType *)> process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -67,7 +83,8 @@ class AsyncQpsServerTest : public Server { Server::CreateServerCredentials(config)); gpr_free(server_address); - builder.RegisterAsyncService(&async_service_); + register_service(&builder, &async_service_); + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -75,22 +92,29 @@ class AsyncQpsServerTest : public Server { server_ = builder.BuildAndStart(); using namespace std::placeholders; + + auto process_rpc_bound = + std::bind(process_rpc, config.payload_config(), _1, _2); + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - auto request_unary = std::bind( - &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - auto request_streaming = std::bind( - &BenchmarkService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front( - new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - request_unary, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - request_streaming, ProcessRPC)); + if (request_unary_function) { + auto request_unary = + std::bind(request_unary_function, &async_service_, _1, _2, _3, + srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front( + new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = + std::bind(request_streaming_function, &async_service_, _1, _2, + srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -155,16 +179,15 @@ class AsyncQpsServerTest : public Server { return reinterpret_cast<ServerRpcContext *>(tag); } - template <class RequestType, class ResponseType> class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextUnaryImpl( - std::function<void(ServerContext *, RequestType *, + std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), request_method_(request_method), invoke_method_(invoke_method), @@ -177,7 +200,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); response_writer_ = grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get()); @@ -205,10 +228,10 @@ class AsyncQpsServerTest : public Server { response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); return true; } - std::unique_ptr<ServerContext> srv_ctx_; + std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; bool (ServerRpcContextUnaryImpl::*next_state_)(bool); - std::function<void(ServerContext *, RequestType *, + std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; std::function<grpc::Status(const RequestType *, ResponseType *)> @@ -216,16 +239,16 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; - template <class RequestType, class ResponseType> class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter< - ResponseType, RequestType> *, - void *)> request_method, + std::function<void( + ServerContextType *, + grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), request_method_(request_method), invoke_method_(invoke_method), @@ -237,7 +260,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>( srv_ctx_.get()); @@ -286,11 +309,11 @@ class AsyncQpsServerTest : public Server { } bool finish_done(bool ok) { return false; /* reset the context */ } - std::unique_ptr<ServerContext> srv_ctx_; + std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; bool (ServerRpcContextStreamingImpl::*next_state_)(bool); std::function<void( - ServerContext *, + ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; std::function<grpc::Status(const RequestType *, ResponseType *)> @@ -298,20 +321,10 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; - static Status ProcessRPC(const SimpleRequest *request, - SimpleResponse *response) { - if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; - BenchmarkService::AsyncService async_service_; + ServiceType async_service_; std::forward_list<ServerRpcContext *> contexts_; class PerThreadShutdownState { @@ -335,8 +348,52 @@ class AsyncQpsServerTest : public Server { std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; +static void RegisterBenchmarkService(ServerBuilder *builder, + BenchmarkService::AsyncService *service) { + builder->RegisterAsyncService(service); +} +static void RegisterGenericService(ServerBuilder *builder, + grpc::AsyncGenericService *service) { + builder->RegisterAsyncGenericService(service); +} + +static Status ProcessSimpleRPC(const PayloadConfig &, + const SimpleRequest *request, + SimpleResponse *response) { + if (request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; +} + +static Status ProcessGenericRPC(const PayloadConfig &payload_config, + const ByteBuffer *request, + ByteBuffer *response) { + int resp_size = payload_config.bytebuf_params().resp_size(); + std::unique_ptr<char> buf(new char[resp_size]); + gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); + Slice slice(s, Slice::STEAL_REF); + *response = ByteBuffer(&slice, 1); + return Status::OK; +} + std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr<Server>(new AsyncQpsServerTest(config)); + return std::unique_ptr<Server>(new AsyncQpsServerTest< + SimpleRequest, SimpleResponse, BenchmarkService::AsyncService, + grpc::ServerContext>( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, ProcessSimpleRPC)); +} +std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { + return std::unique_ptr<Server>( + new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService, + grpc::GenericServerContext>( + config, RegisterGenericService, nullptr, + &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC)); } } // namespace testing diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 3a15bec888..97a1ff5255 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -45,7 +45,7 @@ #include "test/cpp/qps/server.h" #include "test/cpp/qps/timer.h" -#include "test/proto/benchmarks/services.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 430ffb7cdc..a1e73e9abe 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -44,6 +44,7 @@ #include "test/cpp/util/test_config.h" DEFINE_int32(driver_port, 0, "Port for communication with driver"); +DEFINE_int32(server_port, 0, "Port for operation as a server"); static bool got_sigint = false; @@ -53,7 +54,7 @@ namespace grpc { namespace testing { static void RunServer() { - QpsWorker worker(FLAGS_driver_port); + QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); while (!got_sigint) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |