aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-03-04 22:54:07 -0800
committerGravatar Vijay Pai <vpai@google.com>2015-03-04 22:54:07 -0800
commit0bc6805c6de9c2786da0f379d00f8b787a88e92d (patch)
tree798bc5d81463076f2c0e216160edecec529d9b18 /test
parente7a523e910d9fa346887c7620c7ad3e57021748c (diff)
parent43ef582e42b2e5d3b5052523d9dee1fb2ee0ae33 (diff)
Merge pull request #837 from ctiller/qps_driver
QPS driver
Diffstat (limited to 'test')
-rw-r--r--test/core/util/grpc_profiler.c2
-rw-r--r--test/cpp/qps/client.cc252
-rw-r--r--test/cpp/qps/client.h173
-rw-r--r--test/cpp/qps/client_async.cc306
-rw-r--r--test/cpp/qps/client_sync.cc93
-rw-r--r--test/cpp/qps/driver.cc210
-rw-r--r--test/cpp/qps/driver.h61
-rw-r--r--test/cpp/qps/histogram.h85
-rw-r--r--test/cpp/qps/qps_driver.cc132
-rw-r--r--test/cpp/qps/qpstest.proto96
-rw-r--r--test/cpp/qps/server.cc171
-rw-r--r--test/cpp/qps/server.h84
-rw-r--r--test/cpp/qps/server_async.cc146
-rw-r--r--test/cpp/qps/server_sync.cc107
-rwxr-xr-xtest/cpp/qps/single_run_localhost.sh28
-rw-r--r--test/cpp/qps/stats.h60
-rw-r--r--test/cpp/qps/timer.cc71
-rw-r--r--test/cpp/qps/timer.h57
-rw-r--r--test/cpp/qps/worker.cc235
19 files changed, 1576 insertions, 793 deletions
diff --git a/test/core/util/grpc_profiler.c b/test/core/util/grpc_profiler.c
index 35b9361c70..d5b6cfeef1 100644
--- a/test/core/util/grpc_profiler.c
+++ b/test/core/util/grpc_profiler.c
@@ -44,7 +44,7 @@ void grpc_profiler_stop() { ProfilerStop(); }
void grpc_profiler_start(const char *filename) {
gpr_log(GPR_DEBUG,
- "You do not have google-perftools installed, profiling is disabled");
+ "You do not have google-perftools installed, profiling is disabled [for %s]", filename);
gpr_log(GPR_DEBUG,
"To install on ubuntu: sudo apt-get install google-perftools "
"libgoogle-perftools-dev");
diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc
deleted file mode 100644
index 11c39eb4f5..0000000000
--- a/test/cpp/qps/client.cc
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <cassert>
-#include <memory>
-#include <string>
-#include <thread>
-#include <vector>
-#include <sstream>
-
-#include <grpc/grpc.h>
-#include <grpc/support/histogram.h>
-#include <grpc/support/log.h>
-#include <gflags/gflags.h>
-#include <grpc++/client_context.h>
-#include <grpc++/status.h>
-#include "test/core/util/grpc_profiler.h"
-#include "test/cpp/util/create_test_channel.h"
-#include "test/cpp/qps/qpstest.pb.h"
-
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(server_port, 0, "Server port.");
-DEFINE_string(server_host, "127.0.0.1", "Server host.");
-DEFINE_int32(client_threads, 4, "Number of client threads.");
-
-// We have a configurable number of channels for sending RPCs.
-// RPCs are sent round-robin on the available channels by the
-// various threads. Interesting cases are 1 global channel or
-// 1 per-thread channel, but we can support any number.
-// The channels are assigned round-robin on an RPC by RPC basis
-// rather than just at initialization time in order to also measure the
-// impact of cache thrashing caused by channel changes. This is an issue
-// if you are not in one of the above "interesting cases"
-DEFINE_int32(client_channels, 4, "Number of client channels.");
-
-DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread.");
-DEFINE_int32(payload_size, 1, "Payload size in bytes");
-
-// Alternatively, specify parameters for test as a workload so that multiple
-// tests are initiated back-to-back. This is convenient for keeping a borg
-// allocation consistent. This is a space-separated list of
-// [threads channels num_rpcs payload_size ]*
-DEFINE_string(workload, "", "Workload parameters");
-
-using grpc::ChannelInterface;
-using grpc::CreateTestChannel;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google { }
-namespace gflags { }
-using namespace google;
-using namespace gflags;
-
-static double now() {
- gpr_timespec tv = gpr_now();
- return 1e9 * tv.tv_sec + tv.tv_nsec;
-}
-
-void RunTest(const int client_threads, const int client_channels,
- const int num_rpcs, const int payload_size) {
- gpr_log(GPR_INFO,
- "QPS test with parameters\n"
- "enable_ssl = %d\n"
- "client_channels = %d\n"
- "client_threads = %d\n"
- "num_rpcs = %d\n"
- "payload_size = %d\n"
- "server_host:server_port = %s:%d\n\n",
- FLAGS_enable_ssl, client_channels, client_threads, num_rpcs,
- payload_size, FLAGS_server_host.c_str(), FLAGS_server_port);
-
- std::ostringstream oss;
- oss << FLAGS_server_host << ":" << FLAGS_server_port;
-
- class ClientChannelInfo {
- public:
- explicit ClientChannelInfo(const grpc::string &server)
- : channel_(CreateTestChannel(server, FLAGS_enable_ssl)),
- stub_(TestService::NewStub(channel_)) {}
- ChannelInterface *get_channel() { return channel_.get(); }
- TestService::Stub *get_stub() { return stub_.get(); }
-
- private:
- std::shared_ptr<ChannelInterface> channel_;
- std::unique_ptr<TestService::Stub> stub_;
- };
-
- std::vector<ClientChannelInfo> channels;
- for (int i = 0; i < client_channels; i++) {
- channels.push_back(ClientChannelInfo(oss.str()));
- }
-
- std::vector<std::thread> threads; // Will add threads when ready to execute
- std::vector< ::gpr_histogram *> thread_stats(client_threads);
-
- TestService::Stub *stub_stats = channels[0].get_stub();
- grpc::ClientContext context_stats_begin;
- StatsRequest stats_request;
- ServerStats server_stats_begin;
- stats_request.set_test_num(0);
- grpc::Status status_beg = stub_stats->CollectServerStats(
- &context_stats_begin, stats_request, &server_stats_begin);
-
- grpc_profiler_start("qps_client.prof");
-
- for (int i = 0; i < client_threads; i++) {
- gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
- GPR_ASSERT(hist != NULL);
- thread_stats[i] = hist;
-
- threads.push_back(
- std::thread([hist, client_threads, client_channels, num_rpcs,
- payload_size, &channels](int channel_num) {
- SimpleRequest request;
- SimpleResponse response;
- request.set_response_type(
- grpc::testing::PayloadType::COMPRESSABLE);
- request.set_response_size(payload_size);
-
- for (int j = 0; j < num_rpcs; j++) {
- TestService::Stub *stub =
- channels[channel_num].get_stub();
- double start = now();
- grpc::ClientContext context;
- grpc::Status s =
- stub->UnaryCall(&context, request, &response);
- gpr_histogram_add(hist, now() - start);
-
- GPR_ASSERT((s.code() == grpc::StatusCode::OK) &&
- (response.payload().type() ==
- grpc::testing::PayloadType::COMPRESSABLE) &&
- (response.payload().body().length() ==
- static_cast<size_t>(payload_size)));
-
- // Now do runtime round-robin assignment of the next
- // channel number
- channel_num += client_threads;
- channel_num %= client_channels;
- }
- },
- i % client_channels));
- }
-
- gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
- GPR_ASSERT(hist != NULL);
- for (auto &t : threads) {
- t.join();
- }
-
- grpc_profiler_stop();
-
- for (int i = 0; i < client_threads; i++) {
- gpr_histogram *h = thread_stats[i];
- gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f",
- i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90),
- gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99),
- gpr_histogram_percentile(h, 99.9));
- gpr_histogram_merge(hist, h);
- gpr_histogram_destroy(h);
- }
-
- gpr_log(
- GPR_INFO,
- "latency across %d threads with %d channels and %d payload "
- "(50/90/95/99/99.9): %f / %f / %f / %f / %f",
- client_threads, client_channels, payload_size,
- gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90),
- gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99),
- gpr_histogram_percentile(hist, 99.9));
- gpr_histogram_destroy(hist);
-
- grpc::ClientContext context_stats_end;
- ServerStats server_stats_end;
- grpc::Status status_end = stub_stats->CollectServerStats(
- &context_stats_end, stats_request, &server_stats_end);
-
- double elapsed = server_stats_end.time_now() - server_stats_begin.time_now();
- int total_rpcs = client_threads * num_rpcs;
- double utime = server_stats_end.time_user() - server_stats_begin.time_user();
- double stime =
- server_stats_end.time_system() - server_stats_begin.time_system();
- gpr_log(GPR_INFO,
- "Elapsed time: %.3f\n"
- "RPC Count: %d\n"
- "QPS: %.3f\n"
- "System time: %.3f\n"
- "User time: %.3f\n"
- "Resource usage: %.1f%%\n",
- elapsed, total_rpcs, total_rpcs / elapsed, stime, utime,
- (stime + utime) / elapsed * 100.0);
-}
-
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
-
- GPR_ASSERT(FLAGS_server_port);
-
- if (FLAGS_workload.length() == 0) {
- RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs,
- FLAGS_payload_size);
- } else {
- std::istringstream workload(FLAGS_workload);
- int client_threads, client_channels, num_rpcs, payload_size;
- workload >> client_threads;
- while (!workload.eof()) {
- workload >> client_channels >> num_rpcs >> payload_size;
- RunTest(client_threads, client_channels, num_rpcs, payload_size);
- workload >> client_threads;
- }
- gpr_log(GPR_INFO, "Done with specified workload.");
- }
-
- grpc_shutdown();
- return 0;
-}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
new file mode 100644
index 0000000000..221fb30fc5
--- /dev/null
+++ b/test/cpp/qps/client.h
@@ -0,0 +1,173 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_CLIENT_H
+#define TEST_QPS_CLIENT_H
+
+#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/timer.h"
+#include "test/cpp/qps/qpstest.pb.h"
+
+#include <condition_variable>
+#include <mutex>
+
+namespace grpc {
+namespace testing {
+
+class Client {
+ public:
+ explicit Client(const ClientConfig& config) : timer_(new Timer) {
+ for (int i = 0; i < config.client_channels(); i++) {
+ channels_.push_back(ClientChannelInfo(
+ config.server_targets(i % config.server_targets_size()), config));
+ }
+ request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ request_.set_response_size(config.payload_size());
+ }
+ virtual ~Client() {}
+
+ ClientStats Mark() {
+ Histogram latencies;
+ std::vector<Histogram> to_merge(threads_.size());
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->BeginSwap(&to_merge[i]);
+ }
+ std::unique_ptr<Timer> timer(new Timer);
+ timer_.swap(timer);
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->EndSwap();
+ latencies.Merge(&to_merge[i]);
+ }
+
+ auto timer_result = timer->Mark();
+
+ ClientStats stats;
+ latencies.FillProto(stats.mutable_latencies());
+ stats.set_time_elapsed(timer_result.wall);
+ stats.set_time_system(timer_result.system);
+ stats.set_time_user(timer_result.user);
+ return stats;
+ }
+
+ protected:
+ SimpleRequest request_;
+
+ class ClientChannelInfo {
+ public:
+ ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
+ : channel_(CreateTestChannel(target, config.enable_ssl())),
+ stub_(TestService::NewStub(channel_)) {}
+ ChannelInterface* get_channel() { return channel_.get(); }
+ TestService::Stub* get_stub() { return stub_.get(); }
+
+ private:
+ std::shared_ptr<ChannelInterface> channel_;
+ std::unique_ptr<TestService::Stub> stub_;
+ };
+ std::vector<ClientChannelInfo> channels_;
+
+ void StartThreads(size_t num_threads) {
+ for (size_t i = 0; i < num_threads; i++) {
+ threads_.emplace_back(new Thread(this, i));
+ }
+ }
+
+ void EndThreads() { threads_.clear(); }
+
+ virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+
+ private:
+ class Thread {
+ public:
+ Thread(Client* client, size_t idx)
+ : done_(false),
+ new_(nullptr),
+ impl_([this, idx, client]() {
+ for (;;) {
+ // run the loop body
+ client->ThreadFunc(&histogram_, idx);
+ // lock, see if we're done
+ std::lock_guard<std::mutex> g(mu_);
+ if (done_) return;
+ // also check if we're marking, and swap out the histogram if so
+ if (new_) {
+ new_->Swap(&histogram_);
+ new_ = nullptr;
+ cv_.notify_one();
+ }
+ }
+ }) {}
+
+ ~Thread() {
+ {
+ std::lock_guard<std::mutex> g(mu_);
+ done_ = true;
+ }
+ impl_.join();
+ }
+
+ void BeginSwap(Histogram* n) {
+ std::lock_guard<std::mutex> g(mu_);
+ new_ = n;
+ }
+
+ void EndSwap() {
+ std::unique_lock<std::mutex> g(mu_);
+ cv_.wait(g, [this]() { return new_ == nullptr; });
+ }
+
+ private:
+ Thread(const Thread&);
+ Thread& operator=(const Thread&);
+
+ TestService::Stub* stub_;
+ ClientConfig config_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_;
+ Histogram* new_;
+ Histogram histogram_;
+ std::thread impl_;
+ };
+
+ std::vector<std::unique_ptr<Thread>> threads_;
+ std::unique_ptr<Timer> timer_;
+};
+
+std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
+
+} // namespace testing
+} // namespace grpc
+
+#endif
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ea9cfe8b9..5eb9ff6521 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -49,86 +49,53 @@
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/timer.h"
+#include "test/cpp/qps/client.h"
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(server_port, 0, "Server port.");
-DEFINE_string(server_host, "127.0.0.1", "Server host.");
-DEFINE_int32(client_threads, 4, "Number of client threads.");
-
-// We have a configurable number of channels for sending RPCs.
-// RPCs are sent round-robin on the available channels by the
-// various threads. Interesting cases are 1 global channel or
-// 1 per-thread channel, but we can support any number.
-// The channels are assigned round-robin on an RPC by RPC basis
-// rather than just at initialization time in order to also measure the
-// impact of cache thrashing caused by channel changes. This is an issue
-// if you are not in one of the above "interesting cases"
-DEFINE_int32(client_channels, 4, "Number of client channels.");
-
-DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread.");
-DEFINE_int32(payload_size, 1, "Payload size in bytes");
-
-// Alternatively, specify parameters for test as a workload so that multiple
-// tests are initiated back-to-back. This is convenient for keeping a borg
-// allocation consistent. This is a space-separated list of
-// [threads channels num_rpcs payload_size ]*
-DEFINE_string(workload, "", "Workload parameters");
-
-using grpc::ChannelInterface;
-using grpc::CreateTestChannel;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google {}
-namespace gflags {}
-using namespace google;
-using namespace gflags;
-
-static double now() {
- gpr_timespec tv = gpr_now();
- return 1e9 * tv.tv_sec + tv.tv_nsec;
-}
+namespace grpc {
+namespace testing {
class ClientRpcContext {
public:
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
virtual bool RunNextState() = 0; // do next state, return false if steps done
+ virtual void StartNewClone() = 0;
static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); }
static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t);
}
- virtual void report_stats(gpr_histogram *hist) = 0;
+ virtual void report_stats(Histogram *hist) = 0;
};
+
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
- TestService::Stub *stub,
- const RequestType &req,
+ TestService::Stub *stub, const RequestType &req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
- TestService::Stub *, grpc::ClientContext *, const RequestType &,
- void *)> start_req,
+ TestService::Stub *, grpc::ClientContext *, const RequestType &,
+ void *)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done)
: context_(),
- stub_(stub),
+ stub_(stub),
req_(req),
response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done),
- start_(now()),
+ start_req_(start_req),
+ start_(Timer::Now()),
response_reader_(
- start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
+ start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); }
- void report_stats(gpr_histogram *hist) GRPC_OVERRIDE {
- gpr_histogram_add(hist, now() - start_);
+ void report_stats(Histogram *hist) GRPC_OVERRIDE {
+ hist->Add((Timer::Now() - start_) * 1e9);
+ }
+
+ void StartNewClone() {
+ new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_);
}
private:
@@ -151,191 +118,84 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_;
+ std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
+ TestService::Stub *, grpc::ClientContext *, const RequestType &, void *)>
+ start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
response_reader_;
};
-static void RunTest(const int client_threads, const int client_channels,
- const int num_rpcs, const int payload_size) {
- gpr_log(GPR_INFO,
- "QPS test with parameters\n"
- "enable_ssl = %d\n"
- "client_channels = %d\n"
- "client_threads = %d\n"
- "num_rpcs = %d\n"
- "payload_size = %d\n"
- "server_host:server_port = %s:%d\n\n",
- FLAGS_enable_ssl, client_channels, client_threads, num_rpcs,
- payload_size, FLAGS_server_host.c_str(), FLAGS_server_port);
-
- std::ostringstream oss;
- oss << FLAGS_server_host << ":" << FLAGS_server_port;
-
- class ClientChannelInfo {
- public:
- explicit ClientChannelInfo(const grpc::string &server)
- : channel_(CreateTestChannel(server, FLAGS_enable_ssl)),
- stub_(TestService::NewStub(channel_)) {}
- ChannelInterface *get_channel() { return channel_.get(); }
- TestService::Stub *get_stub() { return stub_.get(); }
+class AsyncClient GRPC_FINAL : public Client {
+ public:
+ explicit AsyncClient(const ClientConfig &config) : Client(config) {
+ for (int i = 0; i < config.async_client_threads(); i++) {
+ cli_cqs_.emplace_back(new CompletionQueue);
+ }
- private:
- std::shared_ptr<ChannelInterface> channel_;
- std::unique_ptr<TestService::Stub> stub_;
- };
+ auto payload_size = config.payload_size();
+ auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) {
+ GPR_ASSERT(s.IsOk() && (response->payload().type() ==
+ grpc::testing::PayloadType::COMPRESSABLE) &&
+ (response->payload().body().length() ==
+ static_cast<size_t>(payload_size)));
+ };
+
+ int t = 0;
+ for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
+ for (auto &channel : channels_) {
+ auto *cq = cli_cqs_[t].get();
+ t = (t + 1) % cli_cqs_.size();
+ auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
+ const SimpleRequest &request, void *tag) {
+ return stub->AsyncUnaryCall(ctx, request, cq, tag);
+ };
+
+ TestService::Stub *stub = channel.get_stub();
+ const SimpleRequest &request = request_;
+ new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+ stub, request, start_req, check_done);
+ }
+ }
- std::vector<ClientChannelInfo> channels;
- for (int i = 0; i < client_channels; i++) {
- channels.push_back(ClientChannelInfo(oss.str()));
+ StartThreads(config.async_client_threads());
}
- std::vector<std::thread> threads; // Will add threads when ready to execute
- std::vector< ::gpr_histogram *> thread_stats(client_threads);
-
- TestService::Stub *stub_stats = channels[0].get_stub();
- grpc::ClientContext context_stats_begin;
- StatsRequest stats_request;
- ServerStats server_stats_begin;
- stats_request.set_test_num(0);
- grpc::Status status_beg = stub_stats->CollectServerStats(
- &context_stats_begin, stats_request, &server_stats_begin);
-
- grpc_profiler_start("qps_client_async.prof");
-
- auto CheckDone = [=](grpc::Status s, SimpleResponse *response) {
- GPR_ASSERT(s.IsOk() && (response->payload().type() ==
- grpc::testing::PayloadType::COMPRESSABLE) &&
- (response->payload().body().length() ==
- static_cast<size_t>(payload_size)));
- };
+ ~AsyncClient() GRPC_OVERRIDE {
+ EndThreads();
- for (int i = 0; i < client_threads; i++) {
- gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
- GPR_ASSERT(hist != NULL);
- thread_stats[i] = hist;
-
- threads.push_back(std::thread(
- [hist, client_threads, client_channels, num_rpcs, payload_size,
- &channels, &CheckDone](int channel_num) {
- using namespace std::placeholders;
- SimpleRequest request;
- request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request.set_response_size(payload_size);
-
- grpc::CompletionQueue cli_cq;
- auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, _1,
- _2, _3, &cli_cq, _4);
-
- int rpcs_sent = 0;
- while (rpcs_sent < num_rpcs) {
- rpcs_sent++;
- TestService::Stub *stub = channels[channel_num].get_stub();
- new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(stub,
- request, start_req, CheckDone);
- void *got_tag;
- bool ok;
-
- // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp
- // done)
- cli_cq.Next(&got_tag, &ok);
- if (!ok) break;
- ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState() == false) {
- // call the callback and then delete it
- ctx->report_stats(hist);
- ctx->RunNextState();
- delete ctx;
- }
- cli_cq.Next(&got_tag, &ok);
- if (!ok) break;
- ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState() == false) {
- // call the callback and then delete it
- ctx->report_stats(hist);
- ctx->RunNextState();
- delete ctx;
- }
- // Now do runtime round-robin assignment of the next
- // channel number
- channel_num += client_threads;
- channel_num %= client_channels;
- }
- },
- i % client_channels));
- }
-
- gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
- GPR_ASSERT(hist != NULL);
- for (auto &t : threads) {
- t.join();
+ for (auto &cq : cli_cqs_) {
+ cq->Shutdown();
+ void *got_tag;
+ bool ok;
+ while (cq->Next(&got_tag, &ok)) {
+ delete ClientRpcContext::detag(got_tag);
+ }
+ }
}
- grpc_profiler_stop();
-
- for (int i = 0; i < client_threads; i++) {
- gpr_histogram *h = thread_stats[i];
- gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f",
- i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90),
- gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99),
- gpr_histogram_percentile(h, 99.9));
- gpr_histogram_merge(hist, h);
- gpr_histogram_destroy(h);
+ void ThreadFunc(Histogram *histogram, size_t thread_idx) {
+ void *got_tag;
+ bool ok;
+ cli_cqs_[thread_idx]->Next(&got_tag, &ok);
+
+ ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
+ if (ctx->RunNextState() == false) {
+ // call the callback and then delete it
+ ctx->report_stats(histogram);
+ ctx->RunNextState();
+ ctx->StartNewClone();
+ delete ctx;
+ }
}
- gpr_log(
- GPR_INFO,
- "latency across %d threads with %d channels and %d payload "
- "(50/90/95/99/99.9): %f / %f / %f / %f / %f",
- client_threads, client_channels, payload_size,
- gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90),
- gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99),
- gpr_histogram_percentile(hist, 99.9));
- gpr_histogram_destroy(hist);
-
- grpc::ClientContext context_stats_end;
- ServerStats server_stats_end;
- grpc::Status status_end = stub_stats->CollectServerStats(
- &context_stats_end, stats_request, &server_stats_end);
+ std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
+};
- double elapsed = server_stats_end.time_now() - server_stats_begin.time_now();
- int total_rpcs = client_threads * num_rpcs;
- double utime = server_stats_end.time_user() - server_stats_begin.time_user();
- double stime =
- server_stats_end.time_system() - server_stats_begin.time_system();
- gpr_log(GPR_INFO,
- "Elapsed time: %.3f\n"
- "RPC Count: %d\n"
- "QPS: %.3f\n"
- "System time: %.3f\n"
- "User time: %.3f\n"
- "Resource usage: %.1f%%\n",
- elapsed, total_rpcs, total_rpcs / elapsed, stime, utime,
- (stime + utime) / elapsed * 100.0);
+std::unique_ptr<Client> CreateAsyncClient(const ClientConfig &args) {
+ return std::unique_ptr<Client>(new AsyncClient(args));
}
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
-
- GPR_ASSERT(FLAGS_server_port);
-
- if (FLAGS_workload.length() == 0) {
- RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs,
- FLAGS_payload_size);
- } else {
- std::istringstream workload(FLAGS_workload);
- int client_threads, client_channels, num_rpcs, payload_size;
- workload >> client_threads;
- while (!workload.eof()) {
- workload >> client_channels >> num_rpcs >> payload_size;
- RunTest(client_threads, client_channels, num_rpcs, payload_size);
- workload >> client_threads;
- }
- gpr_log(GPR_INFO, "Done with specified workload.");
- }
-
- grpc_shutdown();
- return 0;
-}
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
new file mode 100644
index 0000000000..7bb7231c6f
--- /dev/null
+++ b/test/cpp/qps/client_sync.cc
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <cassert>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+#include <sstream>
+
+#include <sys/signal.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/histogram.h>
+#include <grpc/support/log.h>
+#include <grpc/support/host_port.h>
+#include <gflags/gflags.h>
+#include <grpc++/client_context.h>
+#include <grpc++/status.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include "test/core/util/grpc_profiler.h"
+#include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/qps/client.h"
+#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/timer.h"
+
+namespace grpc {
+namespace testing {
+
+class SynchronousClient GRPC_FINAL : public Client {
+ public:
+ SynchronousClient(const ClientConfig& config) : Client(config) {
+ size_t num_threads =
+ config.outstanding_rpcs_per_channel() * config.client_channels();
+ responses_.resize(num_threads);
+ StartThreads(num_threads);
+ }
+
+ ~SynchronousClient() { EndThreads(); }
+
+ void ThreadFunc(Histogram* histogram, size_t thread_idx) {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ double start = Timer::Now();
+ grpc::ClientContext context;
+ grpc::Status s =
+ stub->UnaryCall(&context, request_, &responses_[thread_idx]);
+ histogram->Add((Timer::Now() - start) * 1e9);
+ }
+
+ private:
+ std::vector<SimpleResponse> responses_;
+};
+
+std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
+ return std::unique_ptr<Client>(new SynchronousClient(config));
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
new file mode 100644
index 0000000000..6d5df799a2
--- /dev/null
+++ b/test/cpp/qps/driver.cc
@@ -0,0 +1,210 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/cpp/qps/driver.h"
+#include "src/core/support/env.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/host_port.h>
+#include <grpc++/channel_arguments.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/stream.h>
+#include <list>
+#include <thread>
+#include <vector>
+#include "test/cpp/qps/histogram.h"
+
+using std::list;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace grpc {
+namespace testing {
+static vector<string> get_hosts(const string& name) {
+ char* env = gpr_getenv(name.c_str());
+ if (!env) return vector<string>();
+
+ vector<string> out;
+ char* p = env;
+ for (;;) {
+ char* comma = strchr(p, ',');
+ if (comma) {
+ out.emplace_back(p, comma);
+ p = comma + 1;
+ } else {
+ out.emplace_back(p);
+ gpr_free(env);
+ return out;
+ }
+ }
+}
+
+ScenarioResult RunScenario(const ClientConfig& initial_client_config,
+ size_t num_clients,
+ const ServerConfig& server_config,
+ size_t num_servers) {
+ // ClientContext allocator (all are destroyed at scope exit)
+ list<ClientContext> contexts;
+ auto alloc_context = [&contexts]() {
+ contexts.emplace_back();
+ return &contexts.back();
+ };
+
+ // Get client, server lists
+ auto workers = get_hosts("QPS_WORKERS");
+ ClientConfig client_config = initial_client_config;
+
+ // TODO(ctiller): support running multiple configurations, and binpack
+ // client/server pairs
+ // to available workers
+ GPR_ASSERT(workers.size() >= num_clients + num_servers);
+
+ // Trim to just what we need
+ workers.resize(num_clients + num_servers);
+
+ // Start servers
+ struct ServerData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
+ };
+ vector<ServerData> servers;
+ for (size_t i = 0; i < num_servers; i++) {
+ ServerData sd;
+ sd.stub = std::move(Worker::NewStub(
+ CreateChannelDeprecated(workers[i], ChannelArguments())));
+ ServerArgs args;
+ *args.mutable_setup() = server_config;
+ sd.stream = std::move(sd.stub->RunServer(alloc_context()));
+ GPR_ASSERT(sd.stream->Write(args));
+ ServerStatus init_status;
+ GPR_ASSERT(sd.stream->Read(&init_status));
+ char* host;
+ char* driver_port;
+ char* cli_target;
+ gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
+ gpr_join_host_port(&cli_target, host, init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(host);
+ gpr_free(driver_port);
+ gpr_free(cli_target);
+
+ servers.push_back(std::move(sd));
+ }
+
+ // Start clients
+ struct ClientData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
+ };
+ vector<ClientData> clients;
+ for (size_t i = 0; i < num_clients; i++) {
+ ClientData cd;
+ cd.stub = std::move(Worker::NewStub(
+ CreateChannelDeprecated(workers[i + num_servers], ChannelArguments())));
+ ClientArgs args;
+ *args.mutable_setup() = client_config;
+ cd.stream = std::move(cd.stub->RunTest(alloc_context()));
+ GPR_ASSERT(cd.stream->Write(args));
+ ClientStatus init_status;
+ GPR_ASSERT(cd.stream->Read(&init_status));
+
+ clients.push_back(std::move(cd));
+ }
+
+ // Let everything warmup
+ gpr_log(GPR_INFO, "Warming up");
+ gpr_timespec start = gpr_now();
+ gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5)));
+
+ // Start a run
+ gpr_log(GPR_INFO, "Starting");
+ ServerArgs server_mark;
+ server_mark.mutable_mark();
+ ClientArgs client_mark;
+ client_mark.mutable_mark();
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Write(server_mark));
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Write(client_mark));
+ }
+ ServerStatus server_status;
+ ClientStatus client_status;
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Read(&server_status));
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Read(&client_status));
+ }
+
+ // Wait some time
+ gpr_log(GPR_INFO, "Running");
+ gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15)));
+
+ // Finish a run
+ ScenarioResult result;
+ gpr_log(GPR_INFO, "Finishing");
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Write(server_mark));
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Write(client_mark));
+ }
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Read(&server_status));
+ const auto& stats = server_status.stats();
+ result.server_resources.push_back(ResourceUsage{
+ stats.time_elapsed(), stats.time_user(), stats.time_system()});
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Read(&client_status));
+ const auto& stats = client_status.stats();
+ result.latencies.MergeProto(stats.latencies());
+ result.client_resources.push_back(ResourceUsage{
+ stats.time_elapsed(), stats.time_user(), stats.time_system()});
+ }
+
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->WritesDone());
+ GPR_ASSERT(client.stream->Finish().IsOk());
+ }
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->WritesDone());
+ GPR_ASSERT(server.stream->Finish().IsOk());
+ }
+ return result;
+}
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
new file mode 100644
index 0000000000..d87e80dc55
--- /dev/null
+++ b/test/cpp/qps/driver.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_DRIVER_H
+#define TEST_QPS_DRIVER_H
+
+#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/qpstest.pb.h"
+
+namespace grpc {
+namespace testing {
+struct ResourceUsage {
+ double wall_time;
+ double user_time;
+ double system_time;
+};
+
+struct ScenarioResult {
+ Histogram latencies;
+ std::vector<ResourceUsage> client_resources;
+ std::vector<ResourceUsage> server_resources;
+};
+
+ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
+ size_t num_clients,
+ const grpc::testing::ServerConfig& server_config,
+ size_t num_servers);
+} // namespace testing
+} // namespace grpc
+
+#endif
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
new file mode 100644
index 0000000000..7ba00e94c3
--- /dev/null
+++ b/test/cpp/qps/histogram.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_HISTOGRAM_H
+#define TEST_QPS_HISTOGRAM_H
+
+#include <grpc/support/histogram.h>
+#include "test/cpp/qps/qpstest.pb.h"
+
+namespace grpc {
+namespace testing {
+
+class Histogram {
+ public:
+ Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {}
+ ~Histogram() {
+ if (impl_) gpr_histogram_destroy(impl_);
+ }
+ Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
+
+ void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
+ void Add(double value) { gpr_histogram_add(impl_, value); }
+ double Percentile(double pctile) {
+ return gpr_histogram_percentile(impl_, pctile);
+ }
+ double Count() { return gpr_histogram_count(impl_); }
+ void Swap(Histogram* other) { std::swap(impl_, other->impl_); }
+ void FillProto(HistogramData* p) {
+ size_t n;
+ const auto* data = gpr_histogram_get_contents(impl_, &n);
+ for (size_t i = 0; i < n; i++) {
+ p->add_bucket(data[i]);
+ }
+ p->set_min_seen(gpr_histogram_minimum(impl_));
+ p->set_max_seen(gpr_histogram_maximum(impl_));
+ p->set_sum(gpr_histogram_sum(impl_));
+ p->set_sum_of_squares(gpr_histogram_sum_of_squares(impl_));
+ p->set_count(gpr_histogram_count(impl_));
+ }
+ void MergeProto(const HistogramData& p) {
+ gpr_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(),
+ p.min_seen(), p.max_seen(), p.sum(),
+ p.sum_of_squares(), p.count());
+ }
+
+ private:
+ Histogram(const Histogram&);
+ Histogram& operator=(const Histogram&);
+
+ gpr_histogram* impl_;
+};
+}
+}
+
+#endif /* TEST_QPS_HISTOGRAM_H */
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
new file mode 100644
index 0000000000..bf51e7408e
--- /dev/null
+++ b/test/cpp/qps/qps_driver.cc
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <gflags/gflags.h>
+#include <grpc/support/log.h>
+
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/stats.h"
+
+DEFINE_int32(num_clients, 1, "Number of client binaries");
+DEFINE_int32(num_servers, 1, "Number of server binaries");
+
+// Common config
+DEFINE_bool(enable_ssl, false, "Use SSL");
+
+// Server config
+DEFINE_int32(server_threads, 1, "Number of server threads");
+DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
+
+// Client config
+DEFINE_int32(outstanding_rpcs_per_channel, 1,
+ "Number of outstanding rpcs per channel");
+DEFINE_int32(client_channels, 1, "Number of client channels");
+DEFINE_int32(payload_size, 1, "Payload size");
+DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
+DEFINE_int32(async_client_threads, 1, "Async client threads");
+
+using grpc::testing::ClientConfig;
+using grpc::testing::ServerConfig;
+using grpc::testing::ClientType;
+using grpc::testing::ServerType;
+using grpc::testing::ResourceUsage;
+using grpc::testing::sum;
+
+// In some distros, gflags is in the namespace google, and in some others,
+// in gflags. This hack is enabling us to find both.
+namespace google {}
+namespace gflags {}
+using namespace google;
+using namespace gflags;
+
+int main(int argc, char **argv) {
+ grpc_init();
+ ParseCommandLineFlags(&argc, &argv, true);
+
+ ClientType client_type;
+ ServerType server_type;
+ GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
+ GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
+
+ ClientConfig client_config;
+ client_config.set_client_type(client_type);
+ client_config.set_enable_ssl(FLAGS_enable_ssl);
+ client_config.set_outstanding_rpcs_per_channel(
+ FLAGS_outstanding_rpcs_per_channel);
+ client_config.set_client_channels(FLAGS_client_channels);
+ client_config.set_payload_size(FLAGS_payload_size);
+ client_config.set_async_client_threads(FLAGS_async_client_threads);
+
+ ServerConfig server_config;
+ server_config.set_server_type(server_type);
+ server_config.set_threads(FLAGS_server_threads);
+ server_config.set_enable_ssl(FLAGS_enable_ssl);
+
+ auto result = RunScenario(client_config, FLAGS_num_clients, server_config,
+ FLAGS_num_servers);
+
+ gpr_log(GPR_INFO, "QPS: %.1f",
+ result.latencies.Count() /
+ average(result.client_resources,
+ [](ResourceUsage u) { return u.wall_time; }));
+
+ gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us",
+ result.latencies.Percentile(50) / 1000,
+ result.latencies.Percentile(95) / 1000,
+ result.latencies.Percentile(99) / 1000,
+ result.latencies.Percentile(99.9) / 1000);
+
+ gpr_log(GPR_INFO, "Server system time: %.2f%%",
+ 100.0 * sum(result.server_resources,
+ [](ResourceUsage u) { return u.system_time; }) /
+ sum(result.server_resources,
+ [](ResourceUsage u) { return u.wall_time; }));
+ gpr_log(GPR_INFO, "Server user time: %.2f%%",
+ 100.0 * sum(result.server_resources,
+ [](ResourceUsage u) { return u.user_time; }) /
+ sum(result.server_resources,
+ [](ResourceUsage u) { return u.wall_time; }));
+ gpr_log(GPR_INFO, "Client system time: %.2f%%",
+ 100.0 * sum(result.client_resources,
+ [](ResourceUsage u) { return u.system_time; }) /
+ sum(result.client_resources,
+ [](ResourceUsage u) { return u.wall_time; }));
+ gpr_log(GPR_INFO, "Client user time: %.2f%%",
+ 100.0 * sum(result.client_resources,
+ [](ResourceUsage u) { return u.user_time; }) /
+ sum(result.client_resources,
+ [](ResourceUsage u) { return u.wall_time; }));
+
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto
index 68ec6149f5..6a7170bf58 100644
--- a/test/cpp/qps/qpstest.proto
+++ b/test/cpp/qps/qpstest.proto
@@ -51,17 +51,14 @@ message StatsRequest {
}
message ServerStats {
- // wall clock time for timestamp
- required double time_now = 1;
+ // wall clock time
+ required double time_elapsed = 1;
// user time used by the server process and threads
required double time_user = 2;
// server time used by the server process and all threads
required double time_system = 3;
-
- // RPC count so far
- optional int32 num_rpcs = 4;
}
message Payload {
@@ -71,31 +68,75 @@ message Payload {
optional bytes body = 2;
}
-message Latencies {
- required double l_50 = 1;
- required double l_90 = 2;
- required double l_99 = 3;
- required double l_999 = 4;
+message HistogramData {
+ repeated uint32 bucket = 1;
+ required double min_seen = 2;
+ required double max_seen = 3;
+ required double sum = 4;
+ required double sum_of_squares = 5;
+ required double count = 6;
+}
+
+enum ClientType {
+ SYNCHRONOUS_CLIENT = 1;
+ ASYNC_CLIENT = 2;
+}
+
+enum ServerType {
+ SYNCHRONOUS_SERVER = 1;
+ ASYNC_SERVER = 2;
+}
+
+message ClientConfig {
+ repeated string server_targets = 1;
+ required ClientType client_type = 2;
+ required bool enable_ssl = 3;
+ required int32 outstanding_rpcs_per_channel = 4;
+ required int32 client_channels = 5;
+ required int32 payload_size = 6;
+ // only for async client:
+ optional int32 async_client_threads = 7;
}
-message StartArgs {
- required string server_host = 1;
- required int32 server_port = 2;
- optional bool enable_ssl = 3 [default = false];
- optional int32 client_threads = 4 [default = 1];
- optional int32 client_channels = 5 [default = -1];
- optional int32 num_rpcs = 6 [default = 1];
- optional int32 payload_size = 7 [default = 1];
+// Request current stats
+message Mark {}
+
+message ClientArgs {
+ oneof argtype {
+ ClientConfig setup = 1;
+ Mark mark = 2;
+ }
}
-message StartResult {
- required Latencies latencies = 1;
- required int32 num_rpcs = 2;
+message ClientStats {
+ required HistogramData latencies = 1;
required double time_elapsed = 3;
required double time_user = 4;
required double time_system = 5;
}
+message ClientStatus {
+ optional ClientStats stats = 1;
+}
+
+message ServerConfig {
+ required ServerType server_type = 1;
+ required int32 threads = 2;
+ required bool enable_ssl = 3;
+}
+
+message ServerArgs {
+ oneof argtype {
+ ServerConfig setup = 1;
+ Mark mark = 2;
+ }
+}
+
+message ServerStatus {
+ optional ServerStats stats = 1;
+ required int32 port = 2;
+}
+
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
@@ -153,12 +194,6 @@ message StreamingOutputCallResponse {
}
service TestService {
- // Start test with specified workload
- rpc StartTest(StartArgs) returns (Latencies);
-
- // Collect stats from server, ignore request content
- rpc CollectServerStats(StatsRequest) returns (ServerStats);
-
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
@@ -186,3 +221,10 @@ service TestService {
rpc HalfDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
}
+
+service Worker {
+ // Start test with specified workload
+ rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
+ // Start test with specified workload
+ rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
+}
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
deleted file mode 100644
index be27c12b30..0000000000
--- a/test/cpp/qps/server.cc
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <sys/time.h>
-#include <sys/resource.h>
-#include <sys/signal.h>
-#include <thread>
-
-#include <unistd.h>
-
-#include <gflags/gflags.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
-#include <grpc++/config.h>
-#include <grpc++/server.h>
-#include <grpc++/server_builder.h>
-#include <grpc++/server_context.h>
-#include <grpc++/status.h>
-#include "src/cpp/server/thread_pool.h"
-#include "test/core/util/grpc_profiler.h"
-#include "test/cpp/qps/qpstest.pb.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/log.h>
-
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
-
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::ThreadPool;
-using grpc::testing::Payload;
-using grpc::testing::PayloadType;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-using grpc::Status;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google { }
-namespace gflags { }
-using namespace google;
-using namespace gflags;
-
-static bool got_sigint = false;
-
-static void sigint_handler(int x) { got_sigint = 1; }
-
-static double time_double(struct timeval* tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
-}
-
-static bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
-namespace {
-
-class TestServiceImpl GRPC_FINAL : public TestService::Service {
- public:
- Status CollectServerStats(ServerContext* context, const StatsRequest*,
- ServerStats* response) {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
- response->set_time_now(time_double(&tv));
- response->set_time_user(time_double(&usage.ru_utime));
- response->set_time_system(time_double(&usage.ru_stime));
- return Status::OK;
- }
- Status UnaryCall(ServerContext* context, const SimpleRequest* request,
- SimpleResponse* response) {
- if (request->has_response_size() && request->response_size() > 0) {
- if (!SetPayload(request->response_type(), request->response_size(),
- response->mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
- }
- return Status::OK;
- }
-};
-
-} // namespace
-
-static void RunServer() {
- char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
-
- TestServiceImpl service;
-
- SimpleRequest request;
- SimpleResponse response;
-
- ServerBuilder builder;
- builder.AddPort(server_address);
- builder.RegisterService(&service);
-
- std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads));
- builder.SetThreadPool(pool.get());
-
- std::unique_ptr<Server> server(builder.BuildAndStart());
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
-
- grpc_profiler_start("qps_server.prof");
-
- while (!got_sigint) {
- sleep(5);
- }
-
- grpc_profiler_stop();
-
- gpr_free(server_address);
-}
-
-int main(int argc, char** argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
-
- signal(SIGINT, sigint_handler);
-
- GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
- RunServer();
-
- grpc_shutdown();
- return 0;
-}
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
new file mode 100644
index 0000000000..ef71cb94d0
--- /dev/null
+++ b/test/cpp/qps/server.h
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_SERVER_H
+#define TEST_QPS_SERVER_H
+
+#include "test/cpp/qps/timer.h"
+#include "test/cpp/qps/qpstest.pb.h"
+
+namespace grpc {
+namespace testing {
+
+class Server {
+ public:
+ Server() : timer_(new Timer) {}
+ virtual ~Server() {}
+
+ ServerStats Mark() {
+ std::unique_ptr<Timer> timer(new Timer);
+ timer.swap(timer_);
+
+ auto timer_result = timer->Mark();
+
+ ServerStats stats;
+ stats.set_time_elapsed(timer_result.wall);
+ stats.set_time_system(timer_result.system);
+ stats.set_time_user(timer_result.user);
+ return stats;
+ }
+
+ static bool SetPayload(PayloadType type, int size, Payload* payload) {
+ PayloadType response_type = type;
+ // TODO(yangg): Support UNCOMPRESSABLE payload.
+ if (type != PayloadType::COMPRESSABLE) {
+ return false;
+ }
+ payload->set_type(response_type);
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ return true;
+ }
+
+ private:
+ std::unique_ptr<Timer> timer_;
+};
+
+std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
+ int port);
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
+
+} // namespace testing
+} // namespace grpc
+
+#endif
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c006262fc3..64aca957e4 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -51,104 +51,38 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+namespace grpc {
+namespace testing {
-using grpc::CompletionQueue;
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::ThreadPool;
-using grpc::testing::Payload;
-using grpc::testing::PayloadType;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-using grpc::Status;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google {}
-namespace gflags {}
-using namespace google;
-using namespace gflags;
-
-static bool got_sigint = false;
-
-static void sigint_handler(int x) { got_sigint = 1; }
-
-static double time_double(struct timeval *tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
-}
-
-static bool SetPayload(PayloadType type, int size, Payload *payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
-namespace {
-
-class AsyncQpsServerTest {
+class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
+ AsyncQpsServerTest(const ServerConfig &config, int port)
+ : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
+ gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
+ gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
- gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
- request_stats_ =
- std::bind(&TestService::AsyncService::RequestCollectServerStats,
- &async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
- request_stats_, CollectServerStats));
- }
- }
- ~AsyncQpsServerTest() {
- server_->Shutdown();
- void *ignored_tag;
- bool ignored_ok;
- srv_cq_.Shutdown();
- while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
}
- while (!contexts_.empty()) {
- delete contexts_.front();
- contexts_.pop_front();
- }
- for (auto& thr: threads_) {
- thr.join();
- }
- }
- void ServeRpcs(int num_threads) {
- for (int i = 0; i < num_threads; i++) {
+ for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@@ -166,8 +100,16 @@ class AsyncQpsServerTest {
return;
}));
}
- while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ ~AsyncQpsServerTest() {
+ server_->Shutdown();
+ srv_cq_.Shutdown();
+ for (auto &thr : threads_) {
+ thr.join();
+ }
+ while (!contexts_.empty()) {
+ delete contexts_.front();
+ contexts_.pop_front();
}
}
@@ -176,8 +118,8 @@ class AsyncQpsServerTest {
public:
ServerRpcContext() {}
virtual ~ServerRpcContext(){};
- virtual bool RunNextState() = 0;// do next state, return false if all done
- virtual void Reset() = 0; // start this back at a clean state
+ virtual bool RunNextState() = 0; // do next state, return false if all done
+ virtual void Reset() = 0; // start this back at a clean state
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@@ -240,17 +182,6 @@ class AsyncQpsServerTest {
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
- static Status CollectServerStats(const StatsRequest *,
- ServerStats *response) {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
- response->set_time_now(time_double(&tv));
- response->set_time_user(time_double(&usage.ru_utime));
- response->set_time_system(time_double(&usage.ru_stime));
- return Status::OK;
- }
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@@ -264,40 +195,17 @@ class AsyncQpsServerTest {
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
- std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext *, StatsRequest *,
- grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
- request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
-} // namespace
-
-static void RunServer() {
- AsyncQpsServerTest server;
-
- grpc_profiler_start("qps_server_async.prof");
-
- server.ServeRpcs(FLAGS_server_threads);
-
- grpc_profiler_stop();
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
+ int port) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
- GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
-
- signal(SIGINT, sigint_handler);
-
- RunServer();
-
- grpc_shutdown();
- google::protobuf::ShutdownProtobufLibrary();
-
- return 0;
-}
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
new file mode 100644
index 0000000000..beee688608
--- /dev/null
+++ b/test/cpp/qps/server_sync.cc
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <sys/signal.h>
+#include <thread>
+
+#include <unistd.h>
+
+#include <gflags/gflags.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc++/config.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/status.h>
+#include <grpc++/stream.h>
+#include "src/cpp/server/thread_pool.h"
+#include "test/core/util/grpc_profiler.h"
+#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/server.h"
+#include "test/cpp/qps/timer.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+
+namespace grpc {
+namespace testing {
+
+class TestServiceImpl GRPC_FINAL : public TestService::Service {
+ public:
+ Status UnaryCall(ServerContext* context, const SimpleRequest* request,
+ SimpleResponse* response) GRPC_OVERRIDE {
+ if (request->has_response_size() && request->response_size() > 0) {
+ if (!Server::SetPayload(request->response_type(),
+ request->response_size(),
+ response->mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
+ }
+ return Status::OK;
+ }
+};
+
+class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
+ public:
+ SynchronousServer(const ServerConfig& config, int port)
+ : thread_pool_(config.threads()), impl_(MakeImpl(port)) {}
+
+ private:
+ std::unique_ptr<grpc::Server> MakeImpl(int port) {
+ ServerBuilder builder;
+
+ char* server_address = NULL;
+ gpr_join_host_port(&server_address, "::", port);
+ builder.AddPort(server_address);
+ gpr_free(server_address);
+
+ builder.RegisterService(&service_);
+
+ builder.SetThreadPool(&thread_pool_);
+
+ return builder.BuildAndStart();
+ }
+
+ TestServiceImpl service_;
+ ThreadPool thread_pool_;
+ std::unique_ptr<grpc::Server> impl_;
+};
+
+std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
+ const ServerConfig& config, int port) {
+ return std::unique_ptr<Server>(new SynchronousServer(config, port));
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh
new file mode 100755
index 0000000000..2f60b4e49d
--- /dev/null
+++ b/test/cpp/qps/single_run_localhost.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+
+# performs a single qps run with one client and one server
+
+set -ex
+
+cd $(dirname $0)/../../..
+
+killall qps_worker || true
+
+config=opt
+
+NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'`
+
+make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
+
+bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
+PID1=$!
+bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
+PID2=$!
+
+export QPS_WORKERS="localhost:10000,localhost:10010"
+
+bins/$config/qps_driver $*
+
+kill -2 $PID1 $PID2
+wait
+
diff --git a/test/cpp/qps/stats.h b/test/cpp/qps/stats.h
new file mode 100644
index 0000000000..ca59390ad7
--- /dev/null
+++ b/test/cpp/qps/stats.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_STATS_UTILS_H
+#define TEST_QPS_STATS_UTILS_H
+
+#include "test/cpp/qps/histogram.h"
+#include <string>
+
+namespace grpc {
+namespace testing {
+
+template <class T, class F>
+double sum(const T& container, F functor) {
+ double r = 0;
+ for (auto v : container) {
+ r += functor(v);
+ }
+ return r;
+}
+
+template <class T, class F>
+double average(const T& container, F functor) {
+ return sum(container, functor) / container.size();
+}
+
+} // namespace testing
+} // namespace grpc
+
+#endif
diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc
new file mode 100644
index 0000000000..3c1342041c
--- /dev/null
+++ b/test/cpp/qps/timer.cc
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/cpp/qps/timer.h"
+
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <grpc/support/time.h>
+
+Timer::Timer() : start_(Sample()) {}
+
+double Timer::Now() {
+ auto ts = gpr_now();
+ return ts.tv_sec + 1e-9 * ts.tv_nsec;
+}
+
+static double time_double(struct timeval* tv) {
+ return tv->tv_sec + 1e-6 * tv->tv_usec;
+}
+
+Timer::Result Timer::Sample() {
+ struct rusage usage;
+ struct timeval tv;
+ gettimeofday(&tv, nullptr);
+ getrusage(RUSAGE_SELF, &usage);
+
+ Result r;
+ r.wall = time_double(&tv);
+ r.user = time_double(&usage.ru_utime);
+ r.system = time_double(&usage.ru_stime);
+ return r;
+}
+
+Timer::Result Timer::Mark() {
+ Result s = Sample();
+ Result r;
+ r.wall = s.wall - start_.wall;
+ r.user = s.user - start_.user;
+ r.system = s.system - start_.system;
+ return r;
+}
diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h
new file mode 100644
index 0000000000..30dbd7e7d5
--- /dev/null
+++ b/test/cpp/qps/timer.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TEST_QPS_TIMER_H
+#define TEST_QPS_TIMER_H
+
+class Timer {
+ public:
+ Timer();
+
+ struct Result {
+ double wall;
+ double user;
+ double system;
+ };
+
+ Result Mark();
+
+ static double Now();
+
+ private:
+ static Result Sample();
+
+ const Result start_;
+};
+
+#endif // TEST_QPS_TIMER_H
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
new file mode 100644
index 0000000000..3a46a22665
--- /dev/null
+++ b/test/cpp/qps/worker.cc
@@ -0,0 +1,235 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <cassert>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+#include <sstream>
+
+#include <sys/signal.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/histogram.h>
+#include <grpc/support/log.h>
+#include <grpc/support/host_port.h>
+#include <gflags/gflags.h>
+#include <grpc++/client_context.h>
+#include <grpc++/status.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/stream.h>
+#include "test/core/util/grpc_profiler.h"
+#include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/client.h"
+#include "test/cpp/qps/server.h"
+
+DEFINE_int32(driver_port, 0, "Driver server port.");
+DEFINE_int32(server_port, 0, "Spawned server port.");
+
+// In some distros, gflags is in the namespace google, and in some others,
+// in gflags. This hack is enabling us to find both.
+namespace google {}
+namespace gflags {}
+using namespace google;
+using namespace gflags;
+
+static bool got_sigint = false;
+
+namespace grpc {
+namespace testing {
+
+std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
+ switch (config.client_type()) {
+ case ClientType::SYNCHRONOUS_CLIENT:
+ return CreateSynchronousClient(config);
+ case ClientType::ASYNC_CLIENT:
+ return CreateAsyncClient(config);
+ }
+ abort();
+}
+
+std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
+ switch (config.server_type()) {
+ case ServerType::SYNCHRONOUS_SERVER:
+ return CreateSynchronousServer(config, FLAGS_server_port);
+ case ServerType::ASYNC_SERVER:
+ return CreateAsyncServer(config, FLAGS_server_port);
+ }
+ abort();
+}
+
+class WorkerImpl GRPC_FINAL : public Worker::Service {
+ public:
+ WorkerImpl() : acquired_(false) {}
+
+ Status RunTest(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream)
+ GRPC_OVERRIDE {
+ InstanceGuard g(this);
+ if (!g.Acquired()) {
+ return Status(RESOURCE_EXHAUSTED);
+ }
+
+ ClientArgs args;
+ if (!stream->Read(&args)) {
+ return Status(INVALID_ARGUMENT);
+ }
+ if (!args.has_setup()) {
+ return Status(INVALID_ARGUMENT);
+ }
+ auto client = CreateClient(args.setup());
+ if (!client) {
+ return Status(INVALID_ARGUMENT);
+ }
+ ClientStatus status;
+ if (!stream->Write(status)) {
+ return Status(UNKNOWN);
+ }
+ while (stream->Read(&args)) {
+ if (!args.has_mark()) {
+ return Status(INVALID_ARGUMENT);
+ }
+ *status.mutable_stats() = client->Mark();
+ stream->Write(status);
+ }
+
+ return Status::OK;
+ }
+
+ Status RunServer(ServerContext* ctx,
+ ServerReaderWriter<ServerStatus, ServerArgs>* stream)
+ GRPC_OVERRIDE {
+ InstanceGuard g(this);
+ if (!g.Acquired()) {
+ return Status(RESOURCE_EXHAUSTED);
+ }
+
+ ServerArgs args;
+ if (!stream->Read(&args)) {
+ return Status(INVALID_ARGUMENT);
+ }
+ if (!args.has_setup()) {
+ return Status(INVALID_ARGUMENT);
+ }
+ auto server = CreateServer(args.setup());
+ if (!server) {
+ return Status(INVALID_ARGUMENT);
+ }
+ ServerStatus status;
+ status.set_port(FLAGS_server_port);
+ if (!stream->Write(status)) {
+ return Status(UNKNOWN);
+ }
+ while (stream->Read(&args)) {
+ if (!args.has_mark()) {
+ return Status(INVALID_ARGUMENT);
+ }
+ *status.mutable_stats() = server->Mark();
+ stream->Write(status);
+ }
+
+ return Status::OK;
+ }
+
+ private:
+ // Protect against multiple clients using this worker at once.
+ class InstanceGuard {
+ public:
+ InstanceGuard(WorkerImpl* impl)
+ : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
+ ~InstanceGuard() {
+ if (acquired_) {
+ impl_->ReleaseInstance();
+ }
+ }
+
+ bool Acquired() const { return acquired_; }
+
+ private:
+ WorkerImpl* const impl_;
+ const bool acquired_;
+ };
+
+ bool TryAcquireInstance() {
+ std::lock_guard<std::mutex> g(mu_);
+ if (acquired_) return false;
+ acquired_ = true;
+ return true;
+ }
+
+ void ReleaseInstance() {
+ std::lock_guard<std::mutex> g(mu_);
+ GPR_ASSERT(acquired_);
+ acquired_ = false;
+ }
+
+ std::mutex mu_;
+ bool acquired_;
+};
+
+static void RunServer() {
+ char* server_address = NULL;
+ gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
+
+ WorkerImpl service;
+
+ ServerBuilder builder;
+ builder.AddPort(server_address);
+ builder.RegisterService(&service);
+
+ gpr_free(server_address);
+
+ auto server = builder.BuildAndStart();
+
+ while (!got_sigint) {
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_init();
+ ParseCommandLineFlags(&argc, &argv, true);
+
+ grpc::testing::RunServer();
+
+ grpc_shutdown();
+ return 0;
+}