aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-05-28 10:09:32 -0700
committerGravatar vjpai <vpai@google.com>2015-05-28 10:09:32 -0700
commit675140ca1d394f564f2156add8a1a18a2dc132b7 (patch)
tree78fd267e9b416c1ef00514052b54b93ecdbdb4d1 /test/cpp/qps
parentefad2ae67478ca70c064683af42c7f4bcfca5908 (diff)
parent5e9757bf0f8ada75068038c3e29d8b1e875d2ce3 (diff)
Merge branch 'master' into poisson
Conflicts: Makefile test/cpp/qps/client_async.cc test/cpp/qps/qpstest.proto
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/async_streaming_ping_pong_test.cc10
-rw-r--r--test/cpp/qps/async_unary_ping_pong_test.cc11
-rw-r--r--test/cpp/qps/client.h43
-rw-r--r--test/cpp/qps/client_async.cc58
-rw-r--r--test/cpp/qps/client_sync.cc14
-rw-r--r--test/cpp/qps/driver.cc35
-rw-r--r--test/cpp/qps/driver.h15
-rwxr-xr-xtest/cpp/qps/qps-sweep.sh29
-rw-r--r--test/cpp/qps/qps_driver.cc38
-rw-r--r--test/cpp/qps/qps_test.cc9
-rw-r--r--test/cpp/qps/qps_worker.cc24
-rw-r--r--test/cpp/qps/qpstest.proto6
-rw-r--r--test/cpp/qps/report.cc50
-rw-r--r--test/cpp/qps/report.h77
-rw-r--r--test/cpp/qps/server_async.cc125
-rw-r--r--test/cpp/qps/server_sync.cc16
-rwxr-xr-xtest/cpp/qps/single_run_localhost.sh28
-rw-r--r--test/cpp/qps/sync_streaming_ping_pong_test.cc10
-rw-r--r--test/cpp/qps/sync_unary_ping_pong_test.cc9
-rw-r--r--test/cpp/qps/worker.cc7
20 files changed, 409 insertions, 205 deletions
diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc
index a1822b7e15..411df4d32a 100644
--- a/test/cpp/qps/async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/async_streaming_ping_pong_test.cc
@@ -31,12 +31,15 @@
*
*/
+#include <set>
+
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
@@ -64,16 +67,17 @@ static void RunAsyncStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
-
return 0;
}
diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc
index 8b037a8656..eda31b5744 100644
--- a/test/cpp/qps/async_unary_ping_pong_test.cc
+++ b/test/cpp/qps/async_unary_ping_pong_test.cc
@@ -31,12 +31,15 @@
*
*/
+#include <set>
+
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
@@ -64,16 +67,16 @@ static void RunAsyncUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
}
-
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
- grpc::testing::RunAsyncUnaryPingPong();
+ grpc::testing::RunAsyncUnaryPingPong();
return 0;
}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 620103b77d..2b227ec909 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -169,24 +169,26 @@ class Client {
: done_(false),
new_(nullptr),
impl_([this, idx, client]() {
- for (;;) {
- // run the loop body
- bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- done_ = true;
- }
- if (done_) {return;}
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
- cv_.notify_one();
- }
+ for (;;) {
+ // run the loop body
+ bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
+ // lock, see if we're done
+ std::lock_guard<std::mutex> g(mu_);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ done_ = true;
}
- }) {}
+ if (done_) {
+ return;
+ }
+ // check if we're marking, swap out the histogram if so
+ if (new_) {
+ new_->Swap(&histogram_);
+ new_ = nullptr;
+ cv_.notify_one();
+ }
+ }
+ }) {}
~Thread() {
{
@@ -228,10 +230,9 @@ class Client {
<std::chrono::high_resolution_clock>> next_time_;
};
-std::unique_ptr<Client>
- CreateSynchronousUnaryClient(const ClientConfig& args);
-std::unique_ptr<Client>
- CreateSynchronousStreamingClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateSynchronousStreamingClient(
+ const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index d0510ec67a..2d23192767 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -189,8 +189,8 @@ class AsyncClient : public Client {
}
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx)
- GRPC_OVERRIDE GRPC_FINAL {
+ bool ThreadFunc(Histogram* histogram,
+ size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
grpc_time deadline, short_deadline;
@@ -244,6 +244,7 @@ class AsyncClient : public Client {
}
return true;
}
+
private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
@@ -260,8 +261,8 @@ class AsyncClient : public Client {
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncUnaryClient(const ClientConfig& config) :
- AsyncClient(config, SetupCtx) {
+ explicit AsyncUnaryClient(const ClientConfig& config)
+ : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
@@ -283,12 +284,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
- TestService::Stub *stub, const RequestType &req,
- std::function<
- std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType,ResponseType>>(
- TestService::Stub *, grpc::ClientContext *, void *)> start_req,
- std::function<void(grpc::Status, ResponseType *)> on_done)
+ TestService::Stub* stub, const RequestType& req,
+ std::function<std::unique_ptr<
+ grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ TestService::Stub*, grpc::ClientContext*, void*)> start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
req_(req),
@@ -299,7 +299,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
start_(Timer::Now()),
stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok, Histogram *hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
}
void StartNewClone() GRPC_OVERRIDE {
@@ -307,49 +307,47 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
}
void Start() GRPC_OVERRIDE {}
private:
- bool ReqSent(bool ok, Histogram *) {
- return StartWrite(ok);
- }
+ bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
bool StartWrite(bool ok) {
if (!ok) {
- return(false);
+ return (false);
}
start_ = Timer::Now();
next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
}
- bool WriteDone(bool ok, Histogram *) {
+ bool WriteDone(bool ok, Histogram*) {
if (!ok) {
- return(false);
+ return (false);
}
next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
- bool ReadDone(bool ok, Histogram *hist) {
+ bool ReadDone(bool ok, Histogram* hist) {
hist->Add((Timer::Now() - start_) * 1e9);
return StartWrite(ok);
}
grpc::ClientContext context_;
- TestService::Stub *stub_;
+ TestService::Stub* stub_;
RequestType req_;
ResponseType response_;
- bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram *);
- std::function<void(grpc::Status, ResponseType *)> callback_;
- std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType,ResponseType>>(
- TestService::Stub *, grpc::ClientContext *, void *)> start_req_;
+ bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ TestService::Stub*, grpc::ClientContext*, void*)> start_req_;
grpc::Status status_;
double start_;
- std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>>
- stream_;
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
+ stream_;
};
class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncStreamingClient(const ClientConfig &config) :
- AsyncClient(config, SetupCtx) {
+ explicit AsyncStreamingClient(const ClientConfig& config)
+ : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
@@ -358,8 +356,8 @@ private:
static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
- auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
- void *tag) {
+ auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
+ void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 6a89c5acc2..98297d3abb 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -110,7 +110,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config)
- : SynchronousClient(config), context_(num_threads_), stream_(num_threads_) {
+ : SynchronousClient(config),
+ context_(num_threads_),
+ stream_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
@@ -121,8 +123,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
if (*stream) {
- (*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().IsOk());
+ (*stream)->WritesDone();
+ EXPECT_TRUE((*stream)->Finish().IsOk());
}
}
}
@@ -131,7 +133,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
WaitToIssue(thread_idx);
double start = Timer::Now();
if (stream_[thread_idx]->Write(request_) &&
- stream_[thread_idx]->Read(&responses_[thread_idx])) {
+ stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9);
return true;
}
@@ -140,8 +142,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
private:
std::vector<grpc::ClientContext> context_;
- std::vector<std::unique_ptr<grpc::ClientReaderWriter<
- SimpleRequest, SimpleResponse>>> stream_;
+ std::vector<std::unique_ptr<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 6959f980ae..bf12730f97 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -77,13 +77,10 @@ static deque<string> get_hosts(const string& name) {
}
}
-ScenarioResult RunScenario(const ClientConfig& initial_client_config,
- size_t num_clients,
- const ServerConfig& server_config,
- size_t num_servers,
- int warmup_seconds,
- int benchmark_seconds,
- int spawn_local_worker_count) {
+std::unique_ptr<ScenarioResult> RunScenario(
+ const ClientConfig& initial_client_config, size_t num_clients,
+ const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
+ int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
@@ -91,6 +88,11 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
return &contexts.back();
};
+ // To be added to the result, containing the final configuration used for
+ // client and config (incluiding host, etc.)
+ ClientConfig result_client_config;
+ ServerConfig result_server_config;
+
// Get client, server lists
auto workers = get_hosts("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
@@ -103,7 +105,7 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
if (!called_init) {
char args_buf[100];
strcpy(args_buf, "some-benchmark");
- char *args[] = {args_buf};
+ char* args[] = {args_buf};
grpc_test_init(1, args);
called_init = true;
}
@@ -139,6 +141,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
sd.stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args;
+ result_server_config = server_config;
+ result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
GPR_ASSERT(sd.stream->Write(args));
@@ -168,6 +172,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
cd.stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args;
+ result_client_config = client_config;
+ result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
GPR_ASSERT(cd.stream->Write(args));
@@ -205,10 +211,13 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Wait some time
gpr_log(GPR_INFO, "Running");
- gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
+ gpr_sleep_until(
+ gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
// Finish a run
- ScenarioResult result;
+ std::unique_ptr<ScenarioResult> result(new ScenarioResult);
+ result->client_config = result_client_config;
+ result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Write(server_mark));
@@ -219,14 +228,14 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
- result.server_resources.push_back(ResourceUsage{
+ result->server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
- result.latencies.MergeProto(stats.latencies());
- result.client_resources.push_back(ResourceUsage{
+ result->latencies.MergeProto(stats.latencies());
+ result->client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index eb7119a89d..5e9d4b3cb9 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -34,6 +34,8 @@
#ifndef TEST_QPS_DRIVER_H
#define TEST_QPS_DRIVER_H
+#include <memory>
+
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@@ -49,15 +51,14 @@ struct ScenarioResult {
Histogram latencies;
std::vector<ResourceUsage> client_resources;
std::vector<ResourceUsage> server_resources;
+ ClientConfig client_config;
+ ServerConfig server_config;
};
-ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
- size_t num_clients,
- const grpc::testing::ServerConfig& server_config,
- size_t num_servers,
- int warmup_seconds,
- int benchmark_seconds,
- int spawn_local_worker_count);
+std::unique_ptr<ScenarioResult> RunScenario(
+ const grpc::testing::ClientConfig& client_config, size_t num_clients,
+ const grpc::testing::ServerConfig& server_config, size_t num_servers,
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh
index 7bc6eade2c..cb93201933 100755
--- a/test/cpp/qps/qps-sweep.sh
+++ b/test/cpp/qps/qps-sweep.sh
@@ -1,5 +1,34 @@
#!/bin/sh
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
if [ x"$QPS_WORKERS" == x ]; then
echo Error: Must set QPS_WORKERS variable in form \
"host:port,host:port,..." 1>&2
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index dfa3f06753..acc3098839 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -31,12 +31,15 @@
*
*/
+#include <memory>
+#include <set>
+
#include <gflags/gflags.h>
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
-#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/benchmark_config.h"
DEFINE_int32(num_clients, 1, "Number of client binaries");
DEFINE_int32(num_servers, 1, "Number of server binaries");
@@ -72,9 +75,10 @@ using grpc::testing::LoadType;
using grpc::testing::RpcType;
using grpc::testing::ResourceUsage;
-int main(int argc, char** argv) {
- grpc::testing::InitTest(&argc, &argv, true);
+namespace grpc {
+namespace testing {
+static void QpsDriver() {
RpcType rpc_type;
GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type));
@@ -141,17 +145,27 @@ int main(int argc, char** argv) {
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
- FLAGS_server_threads < FLAGS_client_channels *
- FLAGS_outstanding_rpcs_per_channel));
+ FLAGS_server_threads <
+ FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
- auto result = RunScenario(client_config, FLAGS_num_clients,
- server_config, FLAGS_num_servers,
- FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
- FLAGS_local_workers);
+ const auto result = RunScenario(
+ client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
+ FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers);
+
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportQPSPerCore(*result, server_config);
+ GetReporter()->ReportLatency(*result);
+ GetReporter()->ReportTimes(*result);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
- ReportQPSPerCore(result, server_config);
- ReportLatency(result);
- ReportTimes(result);
+ signal(SIGPIPE, SIG_IGN);
+ grpc::testing::QpsDriver();
return 0;
}
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index f567e4cf06..63a37ae07e 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -31,12 +31,15 @@
*
*/
+#include <set>
+
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
@@ -64,14 +67,16 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPSPerCore(result, server_config);
- ReportLatency(result);
+ GetReporter()->ReportQPSPerCore(*result, server_config);
+ GetReporter()->ReportLatency(*result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 46d70dce52..fb49271991 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -64,17 +64,19 @@ namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
- return (config.rpc_type() == RpcType::UNARY) ?
- CreateSynchronousUnaryClient(config) :
- CreateSynchronousStreamingClient(config);
+ return (config.rpc_type() == RpcType::UNARY)
+ ? CreateSynchronousUnaryClient(config)
+ : CreateSynchronousStreamingClient(config);
case ClientType::ASYNC_CLIENT:
- return (config.rpc_type() == RpcType::UNARY) ?
- CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config);
+ return (config.rpc_type() == RpcType::UNARY)
+ ? CreateAsyncUnaryClient(config)
+ : CreateAsyncStreamingClient(config);
}
abort();
}
-std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port) {
+std::unique_ptr<Server> CreateServer(const ServerConfig& config,
+ int server_port) {
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port);
@@ -86,7 +88,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port
class WorkerImpl GRPC_FINAL : public Worker::Service {
public:
- explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {}
+ explicit WorkerImpl(int server_port)
+ : server_port_(server_port), acquired_(false) {}
Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
@@ -97,7 +100,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
- Status ret = RunTestBody(ctx,stream);
+ Status ret = RunTestBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -111,7 +114,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_server.prof");
- Status ret = RunServerBody(ctx,stream);
+ Status ret = RunServerBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -226,8 +229,7 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
server_ = std::move(builder.BuildAndStart());
}
-QpsWorker::~QpsWorker() {
-}
+QpsWorker::~QpsWorker() {}
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto
index 6e710e7598..b8661f3f4f 100644
--- a/test/cpp/qps/qpstest.proto
+++ b/test/cpp/qps/qpstest.proto
@@ -137,8 +137,9 @@ message ClientConfig {
// only for async client:
optional int32 async_client_threads = 7;
optional RpcType rpc_type = 8 [default=UNARY];
- optional LoadType load_type = 9 [default=CLOSED_LOOP];
- optional LoadParams load_params = 10;
+ optional string host = 9;
+ optional LoadType load_type = 10 [default=CLOSED_LOOP];
+ optional LoadParams load_params = 11;
}
// Request current stats
@@ -166,6 +167,7 @@ message ServerConfig {
required ServerType server_type = 1;
optional int32 threads = 2 [default=1];
optional bool enable_ssl = 3 [default=false];
+ optional string host = 4;
}
message ServerArgs {
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 29d88da344..e116175e3b 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -39,27 +39,57 @@
namespace grpc {
namespace testing {
-// QPS: XXX
-void ReportQPS(const ScenarioResult& result) {
+void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
+ reporters_.emplace_back(std::move(reporter));
+}
+
+void CompositeReporter::ReportQPS(const ScenarioResult& result) const {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportQPS(result);
+ }
+}
+
+void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& config) const {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportQPSPerCore(result, config);
+ }
+}
+
+void CompositeReporter::ReportLatency(const ScenarioResult& result) const {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportLatency(result);
+ }
+}
+
+void CompositeReporter::ReportTimes(const ScenarioResult& result) const {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportTimes(result);
+ }
+}
+
+
+void GprLogReporter::ReportQPS(const ScenarioResult& result) const {
gpr_log(GPR_INFO, "QPS: %.1f",
result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
}
-// QPS: XXX (YYY/server core)
-void ReportQPSPerCore(const ScenarioResult& result, const ServerConfig& server_config) {
- auto qps =
+void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& server_config) const {
+ auto qps =
result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; });
- gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps/server_config.threads());
+ gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
+ qps / server_config.threads());
}
-// Latency (50/90/95/99/99.9%-ile): AA/BB/CC/DD/EE us
-void ReportLatency(const ScenarioResult& result) {
- gpr_log(GPR_INFO, "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us",
+void GprLogReporter::ReportLatency(const ScenarioResult& result) const {
+ gpr_log(GPR_INFO,
+ "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us",
result.latencies.Percentile(50) / 1000,
result.latencies.Percentile(90) / 1000,
result.latencies.Percentile(95) / 1000,
@@ -67,7 +97,7 @@ void ReportLatency(const ScenarioResult& result) {
result.latencies.Percentile(99.9) / 1000);
}
-void ReportTimes(const ScenarioResult& result) {
+void GprLogReporter::ReportTimes(const ScenarioResult& result) const {
gpr_log(GPR_INFO, "Server system time: %.2f%%",
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.system_time; }) /
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 343e426ca4..630275ecda 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -34,22 +34,77 @@
#ifndef TEST_QPS_REPORT_H
#define TEST_QPS_REPORT_H
+#include <memory>
+#include <set>
+#include <vector>
+#include <grpc++/config.h>
+
#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/qpstest.grpc.pb.h"
namespace grpc {
namespace testing {
-// QPS: XXX
-void ReportQPS(const ScenarioResult& result);
-// QPS: XXX (YYY/server core)
-void ReportQPSPerCore(const ScenarioResult& result, const ServerConfig& config);
-// Latency (50/90/95/99/99.9%-ile): AA/BB/CC/DD/EE us
-void ReportLatency(const ScenarioResult& result);
-// Server system time: XX%
-// Server user time: XX%
-// Client system time: XX%
-// Client user time: XX%
-void ReportTimes(const ScenarioResult& result);
+/** Interface for all reporters. */
+class Reporter {
+ public:
+ /** Construct a reporter with the given \a name. */
+ Reporter(const string& name) : name_(name) {}
+
+ virtual ~Reporter() {}
+
+ /** Returns this reporter's name.
+ *
+ * Names are constants, set at construction time. */
+ string name() const { return name_; }
+
+ /** Reports QPS for the given \a result. */
+ virtual void ReportQPS(const ScenarioResult& result) const = 0;
+
+ /** Reports QPS per core as (YYY/server core). */
+ virtual void ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& config) const = 0;
+
+ /** Reports latencies for the 50, 90, 95, 99 and 99.9 percentiles, in ms. */
+ virtual void ReportLatency(const ScenarioResult& result) const = 0;
+
+ /** Reports system and user time for client and server systems. */
+ virtual void ReportTimes(const ScenarioResult& result) const = 0;
+
+ private:
+ const string name_;
+};
+
+/** A composite for all reporters to be considered. */
+class CompositeReporter : public Reporter {
+ public:
+ CompositeReporter() : Reporter("CompositeReporter") {}
+
+ /** Adds a \a reporter to the composite. */
+ void add(std::unique_ptr<Reporter> reporter);
+
+ void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE;
+ void ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& config) const GRPC_OVERRIDE;
+ void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE;
+ void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE;
+
+ private:
+ std::vector<std::unique_ptr<Reporter> > reporters_;
+};
+
+/** Reporter to gpr_log(GPR_INFO). */
+class GprLogReporter : public Reporter {
+ public:
+ GprLogReporter(const string& name) : Reporter(name) {}
+
+ private:
+ void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE;
+ void ReportQPSPerCore(const ScenarioResult& result,
+ const ServerConfig& config) const GRPC_OVERRIDE;
+ void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE;
+ void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE;
+};
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 6cb3192908..b9998405f6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
- char* server_address = NULL;
+ char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
@@ -95,16 +95,19 @@ class AsyncQpsServerTest : public Server {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
- void* got_tag;
+ void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) {
- ServerRpcContext* ctx = detag(got_tag);
+ ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- if (ctx->RunNextState(ok) == false) {
+ bool still_going = ctx->RunNextState(ok);
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ if (!shutdown_) {
// this RPC context is done, so refresh it
- std::lock_guard<std::mutex> g(shutdown_mutex_);
- if (!shutdown_) {
+ if (!still_going) {
ctx->Reset();
}
+ } else {
+ return;
}
}
return;
@@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server {
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
- srv_cq_->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
+ srv_cq_->Shutdown();
+ bool ok;
+ void *got_tag;
+ while (srv_cq_->Next(&got_tag, &ok))
+ ;
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@@ -133,23 +140,23 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext() {}
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
- virtual void Reset() = 0; // start this back at a clean state
+ virtual void Reset() = 0; // start this back at a clean state
};
- static void* tag(ServerRpcContext* func) {
- return reinterpret_cast<void*>(func);
+ static void *tag(ServerRpcContext *func) {
+ return reinterpret_cast<void *>(func);
}
- static ServerRpcContext* detag(void* tag) {
- return reinterpret_cast<ServerRpcContext*>(tag);
+ static ServerRpcContext *detag(void *tag) {
+ return reinterpret_cast<ServerRpcContext *>(tag);
}
template <class RequestType, class ResponseType>
class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextUnaryImpl(
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*,
- void*)> request_method,
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *,
+ void *)> request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method),
@@ -159,7 +166,9 @@ class AsyncQpsServerTest : public Server {
AsyncQpsServerTest::tag(this));
}
~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
+ }
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
@@ -192,10 +201,10 @@ class AsyncQpsServerTest : public Server {
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -204,9 +213,9 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextStreamingImpl(
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method,
+ std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<
+ ResponseType, RequestType> *,
+ void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -215,14 +224,15 @@ class AsyncQpsServerTest : public Server {
stream_(&srv_ctx_) {
request_method_(&srv_ctx_, &stream_, AsyncQpsServerTest::tag(this));
}
- ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {
+ ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
- stream_ = grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType>(&srv_ctx_);
+ stream_ =
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(&srv_ctx_);
// Then request the method
next_state_ = &ServerRpcContextStreamingImpl::request_done;
@@ -241,47 +251,47 @@ class AsyncQpsServerTest : public Server {
bool read_done(bool ok) {
if (ok) {
- // invoke the method
- ResponseType response;
- // Call the RPC processing function
- grpc::Status status = invoke_method_(&req_, &response);
- // initiate the write
- stream_.Write(response, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::write_done;
- } else { // client has sent writes done
- // finish the stream
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ // invoke the method
+ ResponseType response;
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response);
+ // initiate the write
+ stream_.Write(response, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::write_done;
+ } else { // client has sent writes done
+ // finish the stream
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
bool write_done(bool ok) {
// now go back and get another streaming read!
if (ok) {
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::read_done;
- }
- else {
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ } else {
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
- bool finish_done(bool ok) {return false; /* reset the context */ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method_;
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
+ request_method_;
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
- grpc::ServerAsyncReaderWriter<ResponseType,RequestType> stream_;
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
- static Status ProcessRPC(const SimpleRequest* request,
- SimpleResponse* response) {
+ static Status ProcessRPC(const SimpleRequest *request,
+ SimpleResponse *response) {
if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@@ -294,19 +304,20 @@ class AsyncQpsServerTest : public Server {
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
TestService::AsyncService async_service_;
- std::function<void(ServerContext*, SimpleRequest*,
- grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
+ std::function<void(ServerContext *, SimpleRequest *,
+ grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext*, grpc::ServerAsyncReaderWriter<
- SimpleResponse,SimpleRequest>*, void*)>
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
- std::forward_list<ServerRpcContext*> contexts_;
+ std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
bool shutdown_;
};
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config,
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 2770233a7c..bc00de9ced 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -70,18 +70,18 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
}
return Status::OK;
}
- Status StreamingCall(ServerContext *context,
- ServerReaderWriter<SimpleResponse, SimpleRequest>*
- stream) GRPC_OVERRIDE {
+ Status StreamingCall(
+ ServerContext* context,
+ ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) GRPC_OVERRIDE {
SimpleRequest request;
while (stream->Read(&request)) {
SimpleResponse response;
if (request.response_size() > 0) {
- if (!Server::SetPayload(request.response_type(),
- request.response_size(),
- response.mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ if (!Server::SetPayload(request.response_type(),
+ request.response_size(),
+ response.mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
}
stream->Write(response);
}
diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh
index 2f60b4e49d..9d76f08f80 100755
--- a/test/cpp/qps/single_run_localhost.sh
+++ b/test/cpp/qps/single_run_localhost.sh
@@ -1,4 +1,32 @@
#!/bin/sh
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# performs a single qps run with one client and one server
diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc
index 48c7ff63e0..d53905a779 100644
--- a/test/cpp/qps/sync_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc
@@ -31,12 +31,15 @@
*
*/
+#include <set>
+
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
@@ -63,14 +66,15 @@ static void RunSynchronousStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
}
-
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousStreamingPingPong();
diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc
index 4c4de6377b..d276d13a43 100644
--- a/test/cpp/qps/sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/sync_unary_ping_pong_test.cc
@@ -31,12 +31,15 @@
*
*/
+#include <set>
+
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
@@ -63,14 +66,16 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
- ReportQPS(result);
- ReportLatency(result);
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousUnaryPingPong();
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 281c617382..dfc102fc17 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -37,6 +37,7 @@
#include <thread>
#include <grpc/grpc.h>
+#include <grpc/support/time.h>
#include <gflags/gflags.h>
#include "qps_worker.h"
@@ -47,7 +48,7 @@ DEFINE_int32(server_port, 0, "Spawned server port.");
static bool got_sigint = false;
-static void sigint_handler(int x) {got_sigint = true;}
+static void sigint_handler(int x) { got_sigint = true; }
namespace grpc {
namespace testing {
@@ -56,7 +57,7 @@ static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5)));
}
}
@@ -69,6 +70,6 @@ int main(int argc, char** argv) {
signal(SIGINT, sigint_handler);
grpc::testing::RunServer();
-
+
return 0;
}