From 8856875900b6f9ae00cd5ffa311a9ed33c40c7e1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 10:50:43 -0800 Subject: Async client progress --- test/cpp/qps/client_async.cc | 149 +++++++++++++++++++++---------------------- 1 file changed, 73 insertions(+), 76 deletions(-) (limited to 'test/cpp/qps/client_async.cc') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ea9cfe8b9..30b85afc29 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -49,50 +49,11 @@ #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/client.h" -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host."); -DEFINE_int32(client_threads, 4, "Number of client threads."); - -// We have a configurable number of channels for sending RPCs. -// RPCs are sent round-robin on the available channels by the -// various threads. Interesting cases are 1 global channel or -// 1 per-thread channel, but we can support any number. -// The channels are assigned round-robin on an RPC by RPC basis -// rather than just at initialization time in order to also measure the -// impact of cache thrashing caused by channel changes. This is an issue -// if you are not in one of the above "interesting cases" -DEFINE_int32(client_channels, 4, "Number of client channels."); - -DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); -DEFINE_int32(payload_size, 1, "Payload size in bytes"); - -// Alternatively, specify parameters for test as a workload so that multiple -// tests are initiated back-to-back. This is convenient for keeping a borg -// allocation consistent. This is a space-separated list of -// [threads channels num_rpcs payload_size ]* -DEFINE_string(workload, "", "Workload parameters"); - -using grpc::ChannelInterface; -using grpc::CreateTestChannel; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static double now() { - gpr_timespec tv = gpr_now(); - return 1e9 * tv.tv_sec + tv.tv_nsec; -} +namespace grpc { +namespace testing { class ClientRpcContext { public: @@ -103,8 +64,9 @@ class ClientRpcContext { static ClientRpcContext *detag(void *t) { return reinterpret_cast(t); } - virtual void report_stats(gpr_histogram *hist) = 0; + virtual void report_stats(Histogram *hist) = 0; }; + template class ClientRpcContextUnaryImpl : public ClientRpcContext { public: @@ -113,22 +75,22 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { const RequestType &req, std::function< std::unique_ptr>( - TestService::Stub *, grpc::ClientContext *, const RequestType &, - void *)> start_req, + TestService::Stub *, grpc::ClientContext *, const RequestType &, + void *)> start_req, std::function on_done) : context_(), - stub_(stub), + stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::ReqSent), callback_(on_done), - start_(now()), + start_(Timer::Now()), response_reader_( - start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} + start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } - void report_stats(gpr_histogram *hist) GRPC_OVERRIDE { - gpr_histogram_add(hist, now() - start_); + void report_stats(Histogram *hist) GRPC_OVERRIDE { + hist->Add((Timer::Now() - start_) * 1e9); } private: @@ -157,6 +119,64 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; +class AsyncClient GRPC_FINAL : public Client { + public: + explicit AsyncClient(const ClientConfig& config) : Client(config) { + for (int i = 0; i < config.async_client_threads(); i++) { + cli_cqs_.emplace_back(new CompletionQueue); + } + + auto payload_size = config.payload_size(); + auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) { + GPR_ASSERT(s.IsOk() && (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast(payload_size))); + }; + + int t = 0; + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + for (auto& channel : channels_) { + auto *cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, const SimpleRequest& request, void *tag) { + return stub->AsyncUnaryCall(ctx, request, cq, tag); + }; + + TestService::Stub* stub = channel.get_stub(); + const SimpleRequest& request = request_; + new ClientRpcContextUnaryImpl(stub, request, start_req, check_done); + } + } + + StartThreads(config.async_client_threads()); + } + + void ThreadFunc(Histogram *histogram, size_t thread_idx) { + void *got_tag; + bool ok; + cli_cqs_[thread_idx]->Next(&got_tag, &ok); + + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState() == false) { + // call the callback and then delete it + ctx->report_stats(histogram); + ctx->RunNextState(); + delete ctx; + } + } + + std::vector> cli_cqs_; +}; + +std::unique_ptr CreateAsyncClient(const ClientConfig& args) { + return std::unique_ptr(new AsyncClient(args)); +} + +} // namespace testing +} // namespace grpc + +#if 0 static void RunTest(const int client_threads, const int client_channels, const int num_rpcs, const int payload_size) { gpr_log(GPR_INFO, @@ -173,23 +193,7 @@ static void RunTest(const int client_threads, const int client_channels, std::ostringstream oss; oss << FLAGS_server_host << ":" << FLAGS_server_port; - class ClientChannelInfo { - public: - explicit ClientChannelInfo(const grpc::string &server) - : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), - stub_(TestService::NewStub(channel_)) {} - ChannelInterface *get_channel() { return channel_.get(); } - TestService::Stub *get_stub() { return stub_.get(); } - - private: - std::shared_ptr channel_; - std::unique_ptr stub_; - }; - - std::vector channels; - for (int i = 0; i < client_channels; i++) { - channels.push_back(ClientChannelInfo(oss.str())); - } + std::vector threads; // Will add threads when ready to execute std::vector< ::gpr_histogram *> thread_stats(client_threads); @@ -204,12 +208,6 @@ static void RunTest(const int client_threads, const int client_channels, grpc_profiler_start("qps_client_async.prof"); - auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { - GPR_ASSERT(s.IsOk() && (response->payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response->payload().body().length() == - static_cast(payload_size))); - }; for (int i = 0; i < client_threads; i++) { gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); @@ -225,8 +223,6 @@ static void RunTest(const int client_threads, const int client_channels, request.set_response_size(payload_size); grpc::CompletionQueue cli_cq; - auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, _1, - _2, _3, &cli_cq, _4); int rpcs_sent = 0; while (rpcs_sent < num_rpcs) { @@ -339,3 +335,4 @@ int main(int argc, char **argv) { grpc_shutdown(); return 0; } +#endif -- cgit v1.2.3 From d1e18faefcf61a184d32e5af17ceff2c2036543f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 10:51:03 -0800 Subject: Async client progress --- test/cpp/qps/client_async.cc | 161 ------------------------------------------- 1 file changed, 161 deletions(-) (limited to 'test/cpp/qps/client_async.cc') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 30b85afc29..4e428da1c6 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -175,164 +175,3 @@ std::unique_ptr CreateAsyncClient(const ClientConfig& args) { } // namespace testing } // namespace grpc - -#if 0 -static void RunTest(const int client_threads, const int client_channels, - const int num_rpcs, const int payload_size) { - gpr_log(GPR_INFO, - "QPS test with parameters\n" - "enable_ssl = %d\n" - "client_channels = %d\n" - "client_threads = %d\n" - "num_rpcs = %d\n" - "payload_size = %d\n" - "server_host:server_port = %s:%d\n\n", - FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, - payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); - - std::ostringstream oss; - oss << FLAGS_server_host << ":" << FLAGS_server_port; - - - - std::vector threads; // Will add threads when ready to execute - std::vector< ::gpr_histogram *> thread_stats(client_threads); - - TestService::Stub *stub_stats = channels[0].get_stub(); - grpc::ClientContext context_stats_begin; - StatsRequest stats_request; - ServerStats server_stats_begin; - stats_request.set_test_num(0); - grpc::Status status_beg = stub_stats->CollectServerStats( - &context_stats_begin, stats_request, &server_stats_begin); - - grpc_profiler_start("qps_client_async.prof"); - - - for (int i = 0; i < client_threads; i++) { - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - thread_stats[i] = hist; - - threads.push_back(std::thread( - [hist, client_threads, client_channels, num_rpcs, payload_size, - &channels, &CheckDone](int channel_num) { - using namespace std::placeholders; - SimpleRequest request; - request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(payload_size); - - grpc::CompletionQueue cli_cq; - - int rpcs_sent = 0; - while (rpcs_sent < num_rpcs) { - rpcs_sent++; - TestService::Stub *stub = channels[channel_num].get_stub(); - new ClientRpcContextUnaryImpl(stub, - request, start_req, CheckDone); - void *got_tag; - bool ok; - - // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp - // done) - cli_cq.Next(&got_tag, &ok); - if (!ok) break; - ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState() == false) { - // call the callback and then delete it - ctx->report_stats(hist); - ctx->RunNextState(); - delete ctx; - } - cli_cq.Next(&got_tag, &ok); - if (!ok) break; - ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState() == false) { - // call the callback and then delete it - ctx->report_stats(hist); - ctx->RunNextState(); - delete ctx; - } - // Now do runtime round-robin assignment of the next - // channel number - channel_num += client_threads; - channel_num %= client_channels; - } - }, - i % client_channels)); - } - - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - for (auto &t : threads) { - t.join(); - } - - grpc_profiler_stop(); - - for (int i = 0; i < client_threads; i++) { - gpr_histogram *h = thread_stats[i]; - gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", - i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), - gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), - gpr_histogram_percentile(h, 99.9)); - gpr_histogram_merge(hist, h); - gpr_histogram_destroy(h); - } - - gpr_log( - GPR_INFO, - "latency across %d threads with %d channels and %d payload " - "(50/90/95/99/99.9): %f / %f / %f / %f / %f", - client_threads, client_channels, payload_size, - gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), - gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), - gpr_histogram_percentile(hist, 99.9)); - gpr_histogram_destroy(hist); - - grpc::ClientContext context_stats_end; - ServerStats server_stats_end; - grpc::Status status_end = stub_stats->CollectServerStats( - &context_stats_end, stats_request, &server_stats_end); - - double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); - int total_rpcs = client_threads * num_rpcs; - double utime = server_stats_end.time_user() - server_stats_begin.time_user(); - double stime = - server_stats_end.time_system() - server_stats_begin.time_system(); - gpr_log(GPR_INFO, - "Elapsed time: %.3f\n" - "RPC Count: %d\n" - "QPS: %.3f\n" - "System time: %.3f\n" - "User time: %.3f\n" - "Resource usage: %.1f%%\n", - elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, - (stime + utime) / elapsed * 100.0); -} - -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - GPR_ASSERT(FLAGS_server_port); - - if (FLAGS_workload.length() == 0) { - RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, - FLAGS_payload_size); - } else { - std::istringstream workload(FLAGS_workload); - int client_threads, client_channels, num_rpcs, payload_size; - workload >> client_threads; - while (!workload.eof()) { - workload >> client_channels >> num_rpcs >> payload_size; - RunTest(client_threads, client_channels, num_rpcs, payload_size); - workload >> client_threads; - } - gpr_log(GPR_INFO, "Done with specified workload."); - } - - grpc_shutdown(); - return 0; -} -#endif -- cgit v1.2.3 From ef6383904280e5606cbf6b5f71a534b1da17e956 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 12:23:12 -0800 Subject: Async client works --- test/cpp/qps/client_async.cc | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'test/cpp/qps/client_async.cc') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 4e428da1c6..cd80f1f9a9 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -60,6 +60,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} virtual bool RunNextState() = 0; // do next state, return false if steps done + virtual void StartNewClone() = 0; static void *tag(ClientRpcContext *c) { return reinterpret_cast(c); } static ClientRpcContext *detag(void *t) { return reinterpret_cast(t); @@ -83,7 +84,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::ReqSent), - callback_(on_done), + callback_(on_done), start_req_(start_req), start_(Timer::Now()), response_reader_( start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} @@ -93,6 +94,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { hist->Add((Timer::Now() - start_) * 1e9); } + void StartNewClone() { + new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); + } + private: bool ReqSent() { next_state_ = &ClientRpcContextUnaryImpl::RespDone; @@ -113,6 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextUnaryImpl::*next_state_)(); std::function callback_; + std::function< + std::unique_ptr>( + TestService::Stub *, grpc::ClientContext *, const RequestType &, + void *)> start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -152,6 +161,19 @@ class AsyncClient GRPC_FINAL : public Client { StartThreads(config.async_client_threads()); } + ~AsyncClient() GRPC_OVERRIDE { + EndThreads(); + + for (auto& cq : cli_cqs_) { + cq->Shutdown(); + void *got_tag; + bool ok; + while (cq->Next(&got_tag, &ok)) { + delete ClientRpcContext::detag(got_tag); + } + } + } + void ThreadFunc(Histogram *histogram, size_t thread_idx) { void *got_tag; bool ok; @@ -162,6 +184,7 @@ class AsyncClient GRPC_FINAL : public Client { // call the callback and then delete it ctx->report_stats(histogram); ctx->RunNextState(); + ctx->StartNewClone(); delete ctx; } } -- cgit v1.2.3 From a182bf12b0ef71e6fdcb93a818c4476a9d142e37 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 13:54:39 -0800 Subject: clang-format --- test/cpp/qps/client.h | 13 +++++-------- test/cpp/qps/client_async.cc | 41 +++++++++++++++++++++-------------------- test/cpp/qps/client_sync.cc | 10 +++++----- test/cpp/qps/driver.cc | 12 ++++++++---- test/cpp/qps/driver.h | 10 +++++----- test/cpp/qps/histogram.h | 6 +++--- test/cpp/qps/qps_driver.cc | 39 ++++++++++++++++++++++++++++++--------- test/cpp/qps/server.h | 24 ++++++++++++------------ test/cpp/qps/server_async.cc | 20 +++++++++++--------- test/cpp/qps/server_sync.cc | 8 ++++---- test/cpp/qps/stats.h | 6 +++--- 11 files changed, 107 insertions(+), 82 deletions(-) (limited to 'test/cpp/qps/client_async.cc') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index c2fdbb576f..221fb30fc5 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -84,8 +84,7 @@ class Client { class ClientChannelInfo { public: - explicit ClientChannelInfo(const grpc::string& target, - const ClientConfig& config) + ClientChannelInfo(const grpc::string& target, const ClientConfig& config) : channel_(CreateTestChannel(target, config.enable_ssl())), stub_(TestService::NewStub(channel_)) {} ChannelInterface* get_channel() { return channel_.get(); } @@ -98,14 +97,12 @@ class Client { std::vector channels_; void StartThreads(size_t num_threads) { - for (size_t i = 0; i < num_threads; i++) { - threads_.emplace_back(new Thread(this, i)); - } + for (size_t i = 0; i < num_threads; i++) { + threads_.emplace_back(new Thread(this, i)); + } } - void EndThreads() { - threads_.clear(); - } + void EndThreads() { threads_.clear(); } virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index cd80f1f9a9..5eb9ff6521 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -72,22 +72,22 @@ template class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - TestService::Stub *stub, - const RequestType &req, + TestService::Stub *stub, const RequestType &req, std::function< std::unique_ptr>( - TestService::Stub *, grpc::ClientContext *, const RequestType &, - void *)> start_req, + TestService::Stub *, grpc::ClientContext *, const RequestType &, + void *)> start_req, std::function on_done) : context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::ReqSent), - callback_(on_done), start_req_(start_req), + callback_(on_done), + start_req_(start_req), start_(Timer::Now()), response_reader_( - start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} + start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } void report_stats(Histogram *hist) GRPC_OVERRIDE { @@ -118,10 +118,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextUnaryImpl::*next_state_)(); std::function callback_; - std::function< - std::unique_ptr>( - TestService::Stub *, grpc::ClientContext *, const RequestType &, - void *)> start_req_; + std::function>( + TestService::Stub *, grpc::ClientContext *, const RequestType &, void *)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -130,7 +129,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { class AsyncClient GRPC_FINAL : public Client { public: - explicit AsyncClient(const ClientConfig& config) : Client(config) { + explicit AsyncClient(const ClientConfig &config) : Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); } @@ -145,16 +144,18 @@ class AsyncClient GRPC_FINAL : public Client { int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto& channel : channels_) { + for (auto &channel : channels_) { auto *cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, const SimpleRequest& request, void *tag) { + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, + const SimpleRequest &request, void *tag) { return stub->AsyncUnaryCall(ctx, request, cq, tag); }; - TestService::Stub* stub = channel.get_stub(); - const SimpleRequest& request = request_; - new ClientRpcContextUnaryImpl(stub, request, start_req, check_done); + TestService::Stub *stub = channel.get_stub(); + const SimpleRequest &request = request_; + new ClientRpcContextUnaryImpl( + stub, request, start_req, check_done); } } @@ -164,7 +165,7 @@ class AsyncClient GRPC_FINAL : public Client { ~AsyncClient() GRPC_OVERRIDE { EndThreads(); - for (auto& cq : cli_cqs_) { + for (auto &cq : cli_cqs_) { cq->Shutdown(); void *got_tag; bool ok; @@ -192,9 +193,9 @@ class AsyncClient GRPC_FINAL : public Client { std::vector> cli_cqs_; }; -std::unique_ptr CreateAsyncClient(const ClientConfig& args) { +std::unique_ptr CreateAsyncClient(const ClientConfig &args) { return std::unique_ptr(new AsyncClient(args)); } -} // namespace testing -} // namespace grpc +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 0be01e137f..7bb7231c6f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -64,20 +64,20 @@ namespace testing { class SynchronousClient GRPC_FINAL : public Client { public: SynchronousClient(const ClientConfig& config) : Client(config) { - size_t num_threads = config.outstanding_rpcs_per_channel() * config.client_channels(); + size_t num_threads = + config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads); StartThreads(num_threads); } - ~SynchronousClient() { - EndThreads(); - } + ~SynchronousClient() { EndThreads(); } void ThreadFunc(Histogram* histogram, size_t thread_idx) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; - grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); + grpc::Status s = + stub->UnaryCall(&context, request_, &responses_[thread_idx]); histogram->Add((Timer::Now() - start) * 1e9); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index de1ccce3d7..6d5df799a2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -71,8 +71,10 @@ static vector get_hosts(const string& name) { } } -ScenarioResult RunScenario(const ClientConfig& initial_client_config, size_t num_clients, - const ServerConfig& server_config, size_t num_servers) { +ScenarioResult RunScenario(const ClientConfig& initial_client_config, + size_t num_clients, + const ServerConfig& server_config, + size_t num_servers) { // ClientContext allocator (all are destroyed at scope exit) list contexts; auto alloc_context = [&contexts]() { @@ -183,13 +185,15 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, size_t num for (auto& server : servers) { GPR_ASSERT(server.stream->Read(&server_status)); const auto& stats = server_status.stats(); - result.server_resources.push_back(ResourceUsage{stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result.server_resources.push_back(ResourceUsage{ + stats.time_elapsed(), stats.time_user(), stats.time_system()}); } for (auto& client : clients) { GPR_ASSERT(client.stream->Read(&client_status)); const auto& stats = client_status.stats(); result.latencies.MergeProto(stats.latencies()); - result.client_resources.push_back(ResourceUsage{stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result.client_resources.push_back(ResourceUsage{ + stats.time_elapsed(), stats.time_user(), stats.time_system()}); } for (auto& client : clients) { diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index b9d2b33f65..d87e80dc55 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -52,10 +52,10 @@ struct ScenarioResult { }; ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, - size_t num_clients, - const grpc::testing::ServerConfig& server_config, - size_t num_servers); -} // namespace testing -} // namespace grpc + size_t num_clients, + const grpc::testing::ServerConfig& server_config, + size_t num_servers); +} // namespace testing +} // namespace grpc #endif diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index e37be80df2..7ba00e94c3 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -43,10 +43,10 @@ namespace testing { class Histogram { public: Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {} - ~Histogram() { if (impl_) gpr_histogram_destroy(impl_); } - Histogram(Histogram&& other) : impl_(other.impl_) { - other.impl_ = nullptr; + ~Histogram() { + if (impl_) gpr_histogram_destroy(impl_); } + Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); } void Add(double value) { gpr_histogram_add(impl_, value); } diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 7d73bb40d2..bf51e7408e 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -93,18 +93,39 @@ int main(int argc, char **argv) { server_config.set_enable_ssl(FLAGS_enable_ssl); auto result = RunScenario(client_config, FLAGS_num_clients, server_config, - FLAGS_num_servers); + FLAGS_num_servers); - gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "QPS: %.1f", + result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us", - result.latencies.Percentile(50) / 1000, result.latencies.Percentile(95) / 1000, - result.latencies.Percentile(99) / 1000, result.latencies.Percentile(99.9) / 1000); - - gpr_log(GPR_INFO, "Server system time: %.2f%%", 100.0 * sum(result.server_resources, [](ResourceUsage u) { return u.system_time; }) / sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; })); - gpr_log(GPR_INFO, "Server user time: %.2f%%", 100.0 * sum(result.server_resources, [](ResourceUsage u) { return u.user_time; }) / sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; })); - gpr_log(GPR_INFO, "Client system time: %.2f%%", 100.0 * sum(result.client_resources, [](ResourceUsage u) { return u.system_time; }) / sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); - gpr_log(GPR_INFO, "Client user time: %.2f%%", 100.0 * sum(result.client_resources, [](ResourceUsage u) { return u.user_time; }) / sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); + result.latencies.Percentile(50) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); + + gpr_log(GPR_INFO, "Server system time: %.2f%%", + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.server_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Server user time: %.2f%%", + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.server_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Client system time: %.2f%%", + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Client user time: %.2f%%", + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); grpc_shutdown(); return 0; diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index ca22d7ca1c..ef71cb94d0 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -42,7 +42,7 @@ namespace testing { class Server { public: - Server():timer_(new Timer) {} + Server() : timer_(new Timer) {} virtual ~Server() {} ServerStats Mark() { @@ -58,17 +58,17 @@ class Server { return stats; } - static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr body(new char[size]()); - payload->set_body(body.get(), size); - return true; - } + static bool SetPayload(PayloadType type, int size, Payload* payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr body(new char[size]()); + payload->set_body(body.get(), size); + return true; + } private: std::unique_ptr timer_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 741a85802a..64aca957e4 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -57,11 +57,12 @@ #include namespace grpc { - namespace testing { +namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + AsyncQpsServerTest(const ServerConfig &config, int port) + : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -103,7 +104,7 @@ class AsyncQpsServerTest : public Server { ~AsyncQpsServerTest() { server_->Shutdown(); srv_cq_.Shutdown(); - for (auto& thr: threads_) { + for (auto &thr : threads_) { thr.join(); } while (!contexts_.empty()) { @@ -117,8 +118,8 @@ class AsyncQpsServerTest : public Server { public: ServerRpcContext() {} virtual ~ServerRpcContext(){}; - virtual bool RunNextState() = 0;// do next state, return false if all done - virtual void Reset() = 0; // start this back at a clean state + virtual bool RunNextState() = 0; // do next state, return false if all done + virtual void Reset() = 0; // start this back at a clean state }; static void *tag(ServerRpcContext *func) { return reinterpret_cast(func); @@ -201,9 +202,10 @@ class AsyncQpsServerTest : public Server { std::forward_list contexts_; }; -std::unique_ptr CreateAsyncServer(const ServerConfig& config, int port) { - return std::unique_ptr(new AsyncQpsServerTest(config, port)); +std::unique_ptr CreateAsyncServer(const ServerConfig &config, + int port) { + return std::unique_ptr(new AsyncQpsServerTest(config, port)); } - }// namespace testing -}// namespace grpc +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index e598fb51ae..88a201fe79 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -62,8 +62,9 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) override { if (request->has_response_size() && request->response_size() > 0) { - if (!Server::SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { + if (!Server::SetPayload(request->response_type(), + request->response_size(), + response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } @@ -74,8 +75,7 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { class SynchronousServer GRPC_FINAL : public grpc::testing::Server { public: SynchronousServer(const ServerConfig& config, int port) - : thread_pool_(config.threads()), - impl_(MakeImpl(port)) {} + : thread_pool_(config.threads()), impl_(MakeImpl(port)) {} private: std::unique_ptr MakeImpl(int port) { diff --git a/test/cpp/qps/stats.h b/test/cpp/qps/stats.h index f7a4f8c05d..ca59390ad7 100644 --- a/test/cpp/qps/stats.h +++ b/test/cpp/qps/stats.h @@ -44,7 +44,7 @@ template double sum(const T& container, F functor) { double r = 0; for (auto v : container) { - r += functor(v); + r += functor(v); } return r; } @@ -54,7 +54,7 @@ double average(const T& container, F functor) { return sum(container, functor) / container.size(); } -} // namespace testing -} // namespace grpc +} // namespace testing +} // namespace grpc #endif -- cgit v1.2.3