aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2015-11-16 14:47:59 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2015-11-16 14:47:59 -0800
commitcffb1c220b004029374c1789782e48c03d7ba739 (patch)
tree53a79e537fbdb1fa9887db2681124f2c4b461456 /test/cpp
parentb047c0fc9b455e6cbfd1d6b080e9bb427282b670 (diff)
parent118c0a0eec5cedfb4d610483c3e26dacf71bcd4a (diff)
Merge branch 'master' into stress_tests_metrics
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/qps/async_streaming_ping_pong_test.cc9
-rw-r--r--test/cpp/qps/async_unary_ping_pong_test.cc9
-rw-r--r--test/cpp/qps/client.h154
-rw-r--r--test/cpp/qps/client_async.cc41
-rw-r--r--test/cpp/qps/client_sync.cc2
-rw-r--r--test/cpp/qps/driver.cc27
-rw-r--r--test/cpp/qps/driver.h8
-rw-r--r--test/cpp/qps/histogram.h4
-rw-r--r--test/cpp/qps/perf_db.proto2
-rwxr-xr-xtest/cpp/qps/qps-sweep.sh18
-rw-r--r--test/cpp/qps/qps_driver.cc119
-rw-r--r--test/cpp/qps/qps_interarrival_test.cc2
-rw-r--r--test/cpp/qps/qps_openloop_test.cc9
-rw-r--r--test/cpp/qps/qps_test.cc9
-rw-r--r--test/cpp/qps/qps_test_with_poll.cc9
-rw-r--r--test/cpp/qps/qps_worker.cc54
-rw-r--r--test/cpp/qps/qps_worker.h6
-rw-r--r--test/cpp/qps/report.cc7
-rw-r--r--test/cpp/qps/report.h1
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc84
-rw-r--r--test/cpp/qps/server.h52
-rw-r--r--test/cpp/qps/server_async.cc35
-rw-r--r--test/cpp/qps/server_sync.cc25
-rwxr-xr-xtest/cpp/qps/single_run_localhost.sh4
-rw-r--r--test/cpp/qps/sync_streaming_ping_pong_test.cc12
-rw-r--r--test/cpp/qps/sync_unary_ping_pong_test.cc12
-rw-r--r--test/cpp/qps/timer.cc2
-rw-r--r--test/cpp/qps/timer.h2
-rw-r--r--test/cpp/qps/worker.cc5
29 files changed, 423 insertions, 300 deletions
diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc
index 411df4d32a..9fef93a70f 100644
--- a/test/cpp/qps/async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/async_streaming_ping_pong_test.cc
@@ -35,8 +35,6 @@
#include <grpc/support/log.h>
-#include <signal.h>
-
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@@ -52,17 +50,15 @@ static void RunAsyncStreamingPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -77,7 +73,6 @@ static void RunAsyncStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
return 0;
}
diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc
index eda31b5744..b4ab0e5d59 100644
--- a/test/cpp/qps/async_unary_ping_pong_test.cc
+++ b/test/cpp/qps/async_unary_ping_pong_test.cc
@@ -35,8 +35,6 @@
#include <grpc/support/log.h>
-#include <signal.h>
-
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@@ -52,17 +50,15 @@ static void RunAsyncUnaryPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -75,7 +71,6 @@ static void RunAsyncUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncUnaryPingPong();
return 0;
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index cd8b34f65b..f4400692fe 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -40,8 +40,9 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/payloads.grpc.pb.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
@@ -75,27 +76,54 @@ class Client {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
- request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request_.set_response_size(config.payload_size());
+ if (config.payload_config().has_bytebuf_params()) {
+ GPR_ASSERT(false); // not yet implemented
+ } else if (config.payload_config().has_simple_params()) {
+ request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ request_.set_response_size(
+ config.payload_config().simple_params().resp_size());
+ request_.mutable_payload()->set_type(
+ grpc::testing::PayloadType::COMPRESSABLE);
+ int size = config.payload_config().simple_params().req_size();
+ std::unique_ptr<char[]> body(new char[size]);
+ request_.mutable_payload()->set_body(body.get(), size);
+ } else if (config.payload_config().has_complex_params()) {
+ GPR_ASSERT(false); // not yet implemented
+ } else {
+ // default should be simple proto without payloads
+ request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ request_.set_response_size(0);
+ request_.mutable_payload()->set_type(
+ grpc::testing::PayloadType::COMPRESSABLE);
+ }
}
virtual ~Client() {}
- ClientStats Mark() {
+ ClientStats Mark(bool reset) {
Histogram latencies;
+ Timer::Result timer_result;
+
// avoid std::vector for old compilers that expect a copy constructor
- Histogram* to_merge = new Histogram[threads_.size()];
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
- }
- std::unique_ptr<Timer> timer(new Timer);
- timer_.swap(timer);
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
- latencies.Merge(&to_merge[i]);
+ if (reset) {
+ Histogram* to_merge = new Histogram[threads_.size()];
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->BeginSwap(&to_merge[i]);
+ }
+ std::unique_ptr<Timer> timer(new Timer);
+ timer_.swap(timer);
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->EndSwap();
+ latencies.Merge(to_merge[i]);
+ }
+ delete[] to_merge;
+ timer_result = timer->Mark();
+ } else {
+ // merge snapshots of each thread histogram
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->MergeStatsInto(&latencies);
+ }
+ timer_result = timer_->Mark();
}
- delete[] to_merge;
-
- auto timer_result = timer->Mark();
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
@@ -122,15 +150,18 @@ class Client {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
- channel_ = CreateTestChannel(target, config.enable_ssl());
- stub_ = TestService::NewStub(channel_);
+ channel_ = CreateTestChannel(
+ target, config.security_params().server_host_override(),
+ config.has_security_params(),
+ !config.security_params().use_test_ca());
+ stub_ = BenchmarkService::NewStub(channel_);
}
Channel* get_channel() { return channel_.get(); }
- TestService::Stub* get_stub() { return stub_.get(); }
+ BenchmarkService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
- std::unique_ptr<TestService::Stub> stub_;
+ std::unique_ptr<BenchmarkService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
@@ -146,37 +177,41 @@ class Client {
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
- if (config.load_type() == CLOSED_LOOP) {
+ const auto& load = config.load_params();
+
+ std::unique_ptr<RandomDist> random_dist;
+ switch (load.load_case()) {
+ case LoadParams::kClosedLoop:
+ // Closed-loop doesn't use random dist at all
+ break;
+ case LoadParams::kPoisson:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ case LoadParams::kUniform:
+ random_dist.reset(
+ new UniformDist(load.uniform().interarrival_lo() * num_threads,
+ load.uniform().interarrival_hi() * num_threads));
+ break;
+ case LoadParams::kDeterm:
+ random_dist.reset(
+ new DetDist(num_threads / load.determ().offered_load()));
+ break;
+ case LoadParams::kPareto:
+ random_dist.reset(
+ new ParetoDist(load.pareto().interarrival_base() * num_threads,
+ load.pareto().alpha()));
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Set closed_loop_ based on whether or not random_dist is set
+ if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
-
- std::unique_ptr<RandomDist> random_dist;
- const auto& load = config.load_params();
- switch (config.load_type()) {
- case POISSON:
- random_dist.reset(
- new ExpDist(load.poisson().offered_load() / num_threads));
- break;
- case UNIFORM:
- random_dist.reset(
- new UniformDist(load.uniform().interarrival_lo() * num_threads,
- load.uniform().interarrival_hi() * num_threads));
- break;
- case DETERMINISTIC:
- random_dist.reset(
- new DetDist(num_threads / load.determ().offered_load()));
- break;
- case PARETO:
- random_dist.reset(
- new ParetoDist(load.pareto().interarrival_base() * num_threads,
- load.pareto().alpha()));
- break;
- default:
- GPR_ASSERT(false);
- break;
- }
-
+ // set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(
@@ -204,7 +239,7 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_(nullptr),
+ new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -219,16 +254,21 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_ = n;
+ new_stats_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
- while (new_ != nullptr) {
+ while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
+ void MergeStatsInto(Histogram* hist) {
+ std::unique_lock<std::mutex> g(mu_);
+ hist->Merge(histogram_);
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@@ -246,21 +286,21 @@ class Client {
if (done_) {
return;
}
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
+ // check if we're resetting stats, swap out the histogram if so
+ if (new_stats_) {
+ new_stats_->Swap(&histogram_);
+ new_stats_ = nullptr;
cv_.notify_one();
}
}
}
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
- Histogram* new_;
+ Histogram* new_stats_;
Histogram histogram_;
Client* client_;
size_t idx_;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed42b7db6..9594179822 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -48,10 +48,10 @@
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
@@ -88,10 +88,10 @@ template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
- int channel_id, TestService::Stub* stub, const RequestType& req,
+ int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, const RequestType&,
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
@@ -131,13 +131,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_;
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, const RequestType&,
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
grpc::Status status_;
double start_;
@@ -151,7 +151,7 @@ class AsyncClient : public Client {
public:
explicit AsyncClient(
const ClientConfig& config,
- std::function<ClientRpcContext*(int, TestService::Stub*,
+ std::function<ClientRpcContext*(int, BenchmarkService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
channel_lock_(new std::mutex[config.client_channels()]),
@@ -354,11 +354,12 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
- StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
- static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
+ static ClientRpcContext* SetupCtx(int channel_id,
+ BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncUnaryClient::StartReq,
@@ -370,10 +371,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
- int channel_id, TestService::Stub* stub, const RequestType& req,
- std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
- CompletionQueue*, void*)> start_req,
+ int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<std::unique_ptr<
+ grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
+ void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
@@ -420,15 +422,15 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return StartWrite(ok);
}
grpc::ClientContext context_;
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
- start_req_;
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
+ void*)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@@ -439,8 +441,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx) {
- // async streaming currently only supported closed loop
- GPR_ASSERT(config.load_type() == CLOSED_LOOP);
+ // async streaming currently only supports closed loop
+ GPR_ASSERT(closed_loop_);
StartThreads(config.async_client_threads());
}
@@ -451,12 +453,13 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
- StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
- static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
+ static ClientRpcContext* SetupCtx(int channel_id,
+ BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncStreamingClient::StartReq,
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index ed4134c743..10d680860a 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -54,10 +54,10 @@
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index dd5c4f4f73..2c6247deea 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -48,6 +48,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
using std::list;
using std::thread;
@@ -91,12 +92,12 @@ static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
}
struct ServerData {
- unique_ptr<Worker::Stub> stub;
+ unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
- unique_ptr<Worker::Stub> stub;
+ unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
@@ -131,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
- int benchmark_port = grpc_pick_unused_port_or_die();
- local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
+ local_workers.emplace_back(new QpsWorker(driver_port));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@@ -161,11 +161,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
- servers[i].stub =
- Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
+ servers[i].stub = WorkerService::NewStub(
+ CreateChannel(workers[i], InsecureCredentials()));
ServerArgs args;
result_server_config = server_config;
- result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
@@ -189,14 +188,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
- clients[i].stub = Worker::NewStub(
+ clients[i].stub = WorkerService::NewStub(
CreateChannel(workers[i + num_servers], InsecureCredentials()));
ClientArgs args;
result_client_config = client_config;
- result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
clients[i].stream =
- clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
+ clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
@@ -211,9 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start a run
gpr_log(GPR_INFO, "Starting");
ServerArgs server_mark;
- server_mark.mutable_mark();
+ server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
- client_mark.mutable_mark();
+ client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
@@ -251,14 +249,15 @@ std::unique_ptr<ScenarioResult> RunScenario(
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system());
+ stats.time_elapsed(), stats.time_user(), stats.time_system(),
+ server_status.cores());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system());
+ stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 6116aa656a..50bf17ceab 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -37,22 +37,24 @@
#include <memory>
#include "test/cpp/qps/histogram.h"
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class ResourceUsage {
public:
- ResourceUsage(double w, double u, double s)
- : wall_time_(w), user_time_(u), system_time_(s) {}
+ ResourceUsage(double w, double u, double s, int c)
+ : wall_time_(w), user_time_(u), system_time_(s), cores_(c) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
+ int cores() const { return cores_; }
private:
double wall_time_;
double user_time_;
double system_time_;
+ int cores_;
};
struct ScenarioResult {
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
index 1151cca87c..35527d2a2c 100644
--- a/test/cpp/qps/histogram.h
+++ b/test/cpp/qps/histogram.h
@@ -35,7 +35,7 @@
#define TEST_QPS_HISTOGRAM_H
#include <grpc/support/histogram.h>
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/benchmarks/stats.grpc.pb.h"
namespace grpc {
namespace testing {
@@ -48,7 +48,7 @@ class Histogram {
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
- void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
+ void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) const {
return gpr_histogram_percentile(impl_, pctile);
diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto
index 7ae5cfe86e..8a691ddded 100644
--- a/test/cpp/qps/perf_db.proto
+++ b/test/cpp/qps/perf_db.proto
@@ -29,7 +29,7 @@
syntax = "proto3";
-import "test/proto/qpstest.proto";
+import "test/proto/benchmarks/control.proto";
package grpc.testing;
diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh
index cb93201933..36ea974812 100755
--- a/test/cpp/qps/qps-sweep.sh
+++ b/test/cpp/qps/qps-sweep.sh
@@ -37,17 +37,21 @@ fi
bins=`find . .. ../.. ../../.. -name bins | head -1`
-for channels in 1 2 4 8
+for secure in true false
do
- for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT
+ for channels in 1 2 4 8
do
- for server in SYNCHRONOUS_SERVER ASYNC_SERVER
+ for client in SYNC_CLIENT ASYNC_CLIENT
do
- for rpc in UNARY STREAMING
+ for server in SYNC_SERVER ASYNC_SERVER
do
- echo "Test $rpc $client $server , $channels channels"
- "$bins"/opt/qps_driver --rpc_type=$rpc \
- --client_type=$client --server_type=$server
+ for rpc in UNARY STREAMING
+ do
+ echo "Test $rpc $client $server, $channels channels, secure=$secure"
+ "$bins"/opt/qps_driver --rpc_type=$rpc \
+ --client_type=$client --server_type=$server \
+ --secure_test=$secure
+ done
done
done
done
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index b1463be8f6..4c93a042cf 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -33,7 +33,6 @@
#include <memory>
#include <set>
-#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
@@ -50,31 +49,39 @@ DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)");
DEFINE_int32(local_workers, 0, "Number of local workers to start");
// Common config
-DEFINE_bool(enable_ssl, false, "Use SSL");
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
// Server config
-DEFINE_int32(server_threads, 1, "Number of server threads");
-DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
+DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
+DEFINE_string(server_type, "SYNC_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
-DEFINE_int32(payload_size, 1, "Payload size");
-DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
+
+DEFINE_int32(simple_req_size, -1, "Simple proto request payload size");
+DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size");
+
+DEFINE_string(client_type, "SYNC_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
-DEFINE_string(load_type, "CLOSED_LOOP", "Load type");
-DEFINE_double(load_param_1, 0.0, "Load parameter 1");
-DEFINE_double(load_param_2, 0.0, "Load parameter 2");
+
+DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)");
+DEFINE_double(uniform_lo, -1.0, "Uniform low interarrival time (us)");
+DEFINE_double(uniform_hi, -1.0, "Uniform high interarrival time (us)");
+DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)");
+DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)");
+DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value");
+
+DEFINE_bool(secure_test, false, "Run a secure test");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
using grpc::testing::ServerType;
-using grpc::testing::LoadType;
using grpc::testing::RpcType;
using grpc::testing::ResourceUsage;
+using grpc::testing::SecurityParams;
namespace grpc {
namespace testing {
@@ -85,72 +92,63 @@ static void QpsDriver() {
ClientType client_type;
ServerType server_type;
- LoadType load_type;
GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
- GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type));
ClientConfig client_config;
client_config.set_client_type(client_type);
- client_config.set_load_type(load_type);
- client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_outstanding_rpcs_per_channel(
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
- client_config.set_payload_size(FLAGS_payload_size);
+
+ // Decide which type to use based on the response type
+ if (FLAGS_simple_resp_size >= 0) {
+ auto params =
+ client_config.mutable_payload_config()->mutable_simple_params();
+ params->set_resp_size(FLAGS_simple_resp_size);
+ if (FLAGS_simple_req_size >= 0) {
+ params->set_req_size(FLAGS_simple_req_size);
+ }
+ } else {
+ // set a reasonable default: proto but no payload
+ client_config.mutable_payload_config()->mutable_simple_params();
+ }
+
client_config.set_async_client_threads(FLAGS_async_client_threads);
client_config.set_rpc_type(rpc_type);
// set up the load parameters
- switch (load_type) {
- case grpc::testing::CLOSED_LOOP:
- break;
- case grpc::testing::POISSON: {
- auto poisson = client_config.mutable_load_params()->mutable_poisson();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- poisson->set_offered_load(FLAGS_load_param_1);
- break;
- }
- case grpc::testing::UNIFORM: {
- auto uniform = client_config.mutable_load_params()->mutable_uniform();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- GPR_ASSERT(FLAGS_load_param_2 != 0.0);
- uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6);
- uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6);
- break;
- }
- case grpc::testing::DETERMINISTIC: {
- auto determ = client_config.mutable_load_params()->mutable_determ();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- determ->set_offered_load(FLAGS_load_param_1);
- break;
- }
- case grpc::testing::PARETO: {
- auto pareto = client_config.mutable_load_params()->mutable_pareto();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- GPR_ASSERT(FLAGS_load_param_2 != 0.0);
- pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6);
- pareto->set_alpha(FLAGS_load_param_2);
- break;
- }
- default:
- GPR_ASSERT(false);
- break;
+ if (FLAGS_poisson_load > 0.0) {
+ auto poisson = client_config.mutable_load_params()->mutable_poisson();
+ poisson->set_offered_load(FLAGS_poisson_load);
+ } else if (FLAGS_uniform_lo > 0.0) {
+ auto uniform = client_config.mutable_load_params()->mutable_uniform();
+ uniform->set_interarrival_lo(FLAGS_uniform_lo / 1e6);
+ uniform->set_interarrival_hi(FLAGS_uniform_hi / 1e6);
+ } else if (FLAGS_determ_load > 0.0) {
+ auto determ = client_config.mutable_load_params()->mutable_determ();
+ determ->set_offered_load(FLAGS_determ_load);
+ } else if (FLAGS_pareto_base > 0.0) {
+ auto pareto = client_config.mutable_load_params()->mutable_pareto();
+ pareto->set_interarrival_base(FLAGS_pareto_base / 1e6);
+ pareto->set_alpha(FLAGS_pareto_alpha);
+ } else {
+ client_config.mutable_load_params()->mutable_closed_loop();
+ // No further load parameters to set up for closed loop
}
ServerConfig server_config;
server_config.set_server_type(server_type);
- server_config.set_threads(FLAGS_server_threads);
- server_config.set_enable_ssl(FLAGS_enable_ssl);
-
- // If we're running a sync-server streaming test, make sure
- // that we have at least as many threads as the active streams
- // or else threads will be blocked from forward progress and the
- // client will deadlock on a timer.
- GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
- rpc_type == grpc::testing::STREAMING &&
- FLAGS_server_threads <
- FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
+ server_config.set_async_server_threads(FLAGS_async_server_threads);
+
+ if (FLAGS_secure_test) {
+ // Set up security params
+ SecurityParams security;
+ security.set_use_test_ca(true);
+ security.set_server_host_override("foo.test.google.fr");
+ client_config.mutable_security_params()->CopyFrom(security);
+ server_config.mutable_security_params()->CopyFrom(security);
+ }
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
@@ -168,7 +166,6 @@ static void QpsDriver() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::QpsDriver();
return 0;
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index a7979e6187..ccda28f09a 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -42,7 +42,7 @@
using grpc::testing::RandomDist;
using grpc::testing::InterarrivalTimer;
-void RunTest(RandomDist &&r, int threads, std::string title) {
+static void RunTest(RandomDist &&r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9));
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 5a6a9249a9..dc88c893bb 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -52,20 +50,16 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
- client_config.set_load_type(POISSON);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(4);
+ server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -80,7 +74,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index d0c4a79cd9..89b35cfb05 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -52,17 +50,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(8);
+ server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -77,7 +73,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc
index 31d2c1bf7b..97da4096ed 100644
--- a/test/cpp/qps/qps_test_with_poll.cc
+++ b/test/cpp/qps/qps_test_with_poll.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -56,17 +54,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(4);
+ server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -83,7 +79,6 @@ int main(int argc, char** argv) {
grpc_platform_become_multipoller = grpc_poll_become_multipoller;
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 4ce77f366d..dc59eab7ef 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -52,17 +52,17 @@
#include <grpc++/security/server_credentials.h>
#include "test/core/util/grpc_profiler.h"
-#include "test/proto/qpstest.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/services.pb.h"
namespace grpc {
namespace testing {
-std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
+static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
- case ClientType::SYNCHRONOUS_CLIENT:
+ case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
? CreateSynchronousUnaryClient(config)
: CreateSynchronousStreamingClient(config);
@@ -76,26 +76,29 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
-std::unique_ptr<Server> CreateServer(const ServerConfig& config,
- int server_port) {
+static void LimitCores(int cores) {}
+
+static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
+ if (config.core_limit() > 0) {
+ LimitCores(config.core_limit());
+ }
switch (config.server_type()) {
- case ServerType::SYNCHRONOUS_SERVER:
- return CreateSynchronousServer(config, server_port);
+ case ServerType::SYNC_SERVER:
+ return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
- return CreateAsyncServer(config, server_port);
+ return CreateAsyncServer(config);
default:
abort();
}
abort();
}
-class WorkerImpl GRPC_FINAL : public Worker::Service {
+class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
- explicit WorkerImpl(int server_port)
- : server_port_(server_port), acquired_(false) {}
+ explicit WorkerServiceImpl() : acquired_(false) {}
- Status RunTest(ServerContext* ctx,
- ServerReaderWriter<ClientStatus, ClientArgs>* stream)
+ Status RunClient(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
@@ -103,7 +106,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
- Status ret = RunTestBody(ctx, stream);
+ Status ret = RunClientBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -126,7 +129,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
// Protect against multiple clients using this worker at once.
class InstanceGuard {
public:
- InstanceGuard(WorkerImpl* impl)
+ InstanceGuard(WorkerServiceImpl* impl)
: impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() {
if (acquired_) {
@@ -137,7 +140,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
bool Acquired() const { return acquired_; }
private:
- WorkerImpl* const impl_;
+ WorkerServiceImpl* const impl_;
const bool acquired_;
};
@@ -154,8 +157,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
acquired_ = false;
}
- Status RunTestBody(ServerContext* ctx,
- ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
+ Status RunClientBody(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
@@ -175,7 +178,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- *status.mutable_stats() = client->Mark();
+ *status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
}
@@ -191,12 +194,13 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- auto server = CreateServer(args.setup(), server_port_);
+ auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
ServerStatus status;
- status.set_port(server_port_);
+ status.set_port(server->port());
+ status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
@@ -204,21 +208,19 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- *status.mutable_stats() = server->Mark();
+ *status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
}
return Status::OK;
}
- const int server_port_;
-
std::mutex mu_;
bool acquired_;
};
-QpsWorker::QpsWorker(int driver_port, int server_port) {
- impl_.reset(new WorkerImpl(server_port));
+QpsWorker::QpsWorker(int driver_port) {
+ impl_.reset(new WorkerServiceImpl());
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index 861588907e..0db88ad3d1 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -42,15 +42,15 @@ class Server;
namespace testing {
-class WorkerImpl;
+class WorkerServiceImpl;
class QpsWorker {
public:
- QpsWorker(int driver_port, int server_port);
+ explicit QpsWorker(int driver_port);
~QpsWorker();
private:
- std::unique_ptr<WorkerImpl> impl_;
+ std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_;
};
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index e03e8e1fb0..b230eb441e 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -43,6 +43,7 @@ namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
+static int Cores(ResourceUsage u) { return u.cores(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
@@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
- qps / result.server_config.threads());
+ qps / sum(result.server_resources, Cores));
}
void GprLogReporter::ReportLatency(const ScenarioResult& result) {
@@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
- auto qpsPerCore = qps / result.server_config.threads();
+ auto qps_per_core = qps / sum(result.server_resources, Cores);
perf_db_client_.setQps(qps);
- perf_db_client_.setQpsPerCore(qpsPerCore);
+ perf_db_client_.setQpsPerCore(qps_per_core);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 00d12369d5..78779231d3 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -41,7 +41,6 @@
#include <grpc++/support/config.h>
#include "test/cpp/qps/driver.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/perf_db_client.h"
namespace grpc {
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
new file mode 100644
index 0000000000..df06f7e471
--- /dev/null
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <set>
+
+#include <grpc/support/log.h>
+
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
+
+namespace grpc {
+namespace testing {
+
+static const int WARMUP = 5;
+static const int BENCHMARK = 10;
+
+static void RunSynchronousUnaryPingPong() {
+ gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
+
+ ClientConfig client_config;
+ client_config.set_client_type(SYNC_CLIENT);
+ client_config.set_outstanding_rpcs_per_channel(1);
+ client_config.set_client_channels(1);
+ client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
+
+ ServerConfig server_config;
+ server_config.set_server_type(SYNC_SERVER);
+
+ // Set up security params
+ SecurityParams security;
+ security.set_use_test_ca(true);
+ security.set_server_host_override("foo.test.google.fr");
+ client_config.mutable_security_params()->CopyFrom(security);
+ server_config.mutable_security_params()->CopyFrom(security);
+
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
+ grpc::testing::RunSynchronousUnaryPingPong();
+
+ return 0;
+}
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index e48e873dc3..6e81edc8ff 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -34,22 +34,38 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
+#include <grpc/support/cpu.h>
+#include <grpc++/security/server_credentials.h>
+
+#include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/messages.grpc.pb.h"
+#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class Server {
public:
- Server() : timer_(new Timer) {}
+ explicit Server(const ServerConfig& config) : timer_(new Timer) {
+ if (config.port()) {
+ port_ = config.port();
+ } else {
+ port_ = grpc_pick_unused_port_or_die();
+ }
+ }
virtual ~Server() {}
- ServerStats Mark() {
- std::unique_ptr<Timer> timer(new Timer);
- timer.swap(timer_);
-
- auto timer_result = timer->Mark();
+ ServerStats Mark(bool reset) {
+ Timer::Result timer_result;
+ if (reset) {
+ std::unique_ptr<Timer> timer(new Timer);
+ timer.swap(timer_);
+ timer_result = timer->Mark();
+ } else {
+ timer_result = timer_->Mark();
+ }
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
@@ -70,13 +86,29 @@ class Server {
return true;
}
+ int port() const { return port_; }
+ int cores() const { return gpr_cpu_num_cores(); }
+ static std::shared_ptr<ServerCredentials> CreateServerCredentials(
+ const ServerConfig& config) {
+ if (config.has_security_params()) {
+ SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
+ test_server1_cert};
+ SslServerCredentialsOptions ssl_opts;
+ ssl_opts.pem_root_certs = "";
+ ssl_opts.pem_key_cert_pairs.push_back(pkcp);
+ return SslServerCredentials(ssl_opts);
+ } else {
+ return InsecureServerCredentials();
+ }
+ }
+
private:
+ int port_;
std::unique_ptr<Timer> timer_;
};
-std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
- int port);
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
+std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 98fa9c53e2..2d922fa615 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -49,38 +49,40 @@
#include <grpc++/security/server_credentials.h>
#include <gtest/gtest.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest(const ServerConfig &config, int port) {
+ explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", port);
+
+ gpr_join_host_port(&server_address, "::", port());
ServerBuilder builder;
- builder.AddListeningPort(server_address, InsecureServerCredentials());
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
- for (int i = 0; i < 10000 / config.threads(); i++) {
- for (int j = 0; j < config.threads(); j++) {
+ for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
+ for (int j = 0; j < config.async_server_threads(); j++) {
auto request_unary = std::bind(
- &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
- _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
+ &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_,
+ _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
auto request_streaming = std::bind(
- &TestService::AsyncService::RequestStreamingCall, &async_service_,
- _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
+ &BenchmarkService::AsyncService::RequestStreamingCall,
+ &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary, ProcessRPC));
@@ -89,10 +91,10 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
@@ -309,7 +311,7 @@ class AsyncQpsServerTest : public Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
- TestService::AsyncService async_service_;
+ BenchmarkService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
class PerThreadShutdownState {
@@ -333,9 +335,8 @@ class AsyncQpsServerTest : public Server {
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
- int port) {
- return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config));
}
} // namespace testing
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index b760ef63ec..a09b174b7e 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -43,14 +43,14 @@
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
-class TestServiceImpl GRPC_FINAL : public TestService::Service {
+class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) GRPC_OVERRIDE {
@@ -84,30 +84,29 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
- SynchronousServer(const ServerConfig& config, int port)
- : impl_(MakeImpl(port)) {}
-
- private:
- std::unique_ptr<grpc::Server> MakeImpl(int port) {
+ explicit SynchronousServer(const ServerConfig& config) : Server(config) {
ServerBuilder builder;
char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", port);
- builder.AddListeningPort(server_address, InsecureServerCredentials());
+
+ gpr_join_host_port(&server_address, "::", port());
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterService(&service_);
- return builder.BuildAndStart();
+ impl_ = builder.BuildAndStart();
}
- TestServiceImpl service_;
+ private:
+ BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
- const ServerConfig& config, int port) {
- return std::unique_ptr<Server>(new SynchronousServer(config, port));
+ const ServerConfig& config) {
+ return std::unique_ptr<Server>(new SynchronousServer(config));
}
} // namespace testing
diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh
index 9d76f08f80..f5356f1834 100755
--- a/test/cpp/qps/single_run_localhost.sh
+++ b/test/cpp/qps/single_run_localhost.sh
@@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
-bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
+bins/$config/qps_worker -driver_port 10000 &
PID1=$!
-bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
+bins/$config/qps_worker -driver_port 10010 &
PID2=$!
export QPS_WORKERS="localhost:10000,localhost:10010"
diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc
index 52e43939a8..186afc03f7 100644
--- a/test/cpp/qps/sync_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -51,17 +49,14 @@ static void RunSynchronousStreamingPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
ClientConfig client_config;
- client_config.set_client_type(SYNCHRONOUS_CLIENT);
- client_config.set_enable_ssl(false);
+ client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_rpc_type(STREAMING);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
- server_config.set_server_type(SYNCHRONOUS_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -75,7 +70,6 @@ static void RunSynchronousStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousStreamingPingPong();
return 0;
diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc
index fbd21357aa..25851833a6 100644
--- a/test/cpp/qps/sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/sync_unary_ping_pong_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -51,17 +49,14 @@ static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
- client_config.set_client_type(SYNCHRONOUS_CLIENT);
- client_config.set_enable_ssl(false);
+ client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
- server_config.set_server_type(SYNCHRONOUS_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -76,7 +71,6 @@ static void RunSynchronousUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;
diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc
index 8edb838da3..3ec7f49f83 100644
--- a/test/cpp/qps/timer.cc
+++ b/test/cpp/qps/timer.cc
@@ -61,7 +61,7 @@ Timer::Result Timer::Sample() {
return r;
}
-Timer::Result Timer::Mark() {
+Timer::Result Timer::Mark() const {
Result s = Sample();
Result r;
r.wall = s.wall - start_.wall;
diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h
index 30dbd7e7d5..d1aee1a9d1 100644
--- a/test/cpp/qps/timer.h
+++ b/test/cpp/qps/timer.h
@@ -44,7 +44,7 @@ class Timer {
double system;
};
- Result Mark();
+ Result Mark() const;
static double Now();
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 935e4853a6..430ffb7cdc 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -43,8 +43,7 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
-DEFINE_int32(driver_port, 0, "Driver server port.");
-DEFINE_int32(server_port, 0, "Spawned server port.");
+DEFINE_int32(driver_port, 0, "Port for communication with driver");
static bool got_sigint = false;
@@ -54,7 +53,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
- QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
+ QpsWorker worker(FLAGS_driver_port);
while (!got_sigint) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),