aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-13 10:31:05 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-13 10:31:05 -0700
commit46e85b02cda4ad4d549e444b78d1e74a3021a9d9 (patch)
tree51bccc374623625a19a5fb53df60bc08c2c7713e /test/cpp
parent58d637444187f36181d3662e85e42aa1d23af92a (diff)
parent58d7310fbc291d2afa9cbb100544fc5c96cd3249 (diff)
Merge remote-tracking branch 'upstream/master' into csharp_upgrade_to_proto3
Conflicts: src/csharp/Grpc.Core/VersionInfo.cs src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj src/csharp/Grpc.IntegrationTesting/InteropClient.cs src/csharp/Grpc.IntegrationTesting/packages.config src/csharp/build_packages.bat
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/end2end_test.cc2
-rw-r--r--test/cpp/qps/client.h82
-rw-r--r--test/cpp/qps/client_async.cc57
-rw-r--r--test/cpp/qps/client_sync.cc27
-rw-r--r--test/cpp/qps/driver.cc102
-rw-r--r--test/cpp/qps/driver.h16
-rw-r--r--test/cpp/qps/interarrival.h14
-rw-r--r--test/cpp/qps/qps_driver.cc1
-rw-r--r--test/cpp/qps/report.cc93
-rw-r--r--test/cpp/qps/server_async.cc40
10 files changed, 247 insertions, 187 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 5f0749daa5..37669815c6 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -117,7 +117,7 @@ class Proxy : public ::grpc::cpp::test::util::TestService::Service {
}
private:
- std::unique_ptr<::grpc::cpp::test::util::TestService::Stub> stub_;
+ std::unique_ptr< ::grpc::cpp::test::util::TestService::Stub> stub_;
};
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 28cd32a197..1c4f46328f 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -41,6 +41,7 @@
#include <condition_variable>
#include <mutex>
+#include <grpc++/config.h>
namespace grpc {
@@ -67,10 +68,12 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time;
class Client {
public:
explicit Client(const ClientConfig& config)
- : timer_(new Timer), interarrival_timer_() {
+ : channels_(config.client_channels()),
+ timer_(new Timer),
+ interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
- channels_.push_back(ClientChannelInfo(
- config.server_targets(i % config.server_targets_size()), config));
+ channels_[i].init(config.server_targets(i % config.server_targets_size()),
+ config);
}
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size());
@@ -79,7 +82,8 @@ class Client {
ClientStats Mark() {
Histogram latencies;
- std::vector<Histogram> to_merge(threads_.size());
+ // avoid std::vector for old compilers that expect a copy constructor
+ Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
@@ -89,6 +93,7 @@ class Client {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
}
+ delete[] to_merge;
auto timer_result = timer->Mark();
@@ -106,9 +111,20 @@ class Client {
class ClientChannelInfo {
public:
- ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
- : channel_(CreateTestChannel(target, config.enable_ssl())),
- stub_(TestService::NewStub(channel_)) {}
+ ClientChannelInfo() {}
+ ClientChannelInfo(const ClientChannelInfo& i) {
+ // The copy constructor is to satisfy old compilers
+ // that need it for using std::vector . It is only ever
+ // used for empty entries
+ GPR_ASSERT(!i.channel_ && !i.stub_);
+ }
+ void init(const grpc::string& target, const ClientConfig& config) {
+ // We have to use a 2-phase init like this with a default
+ // constructor followed by an initializer function to make
+ // old compilers happy with using this in std::vector
+ channel_ = CreateTestChannel(target, config.enable_ssl());
+ stub_ = TestService::NewStub(channel_);
+ }
ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
@@ -189,27 +205,9 @@ class Client {
Thread(Client* client, size_t idx)
: done_(false),
new_(nullptr),
- impl_([this, idx, client]() {
- for (;;) {
- // run the loop body
- bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- done_ = true;
- }
- if (done_) {
- return;
- }
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
- cv_.notify_one();
- }
- }
- }) {}
+ client_(client),
+ idx_(idx),
+ impl_(&Thread::ThreadFunc, this) {}
~Thread() {
{
@@ -226,13 +224,37 @@ class Client {
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
- cv_.wait(g, [this]() { return new_ == nullptr; });
+ while (new_ != nullptr) {
+ cv_.wait(g);
+ };
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
+ void ThreadFunc() {
+ for (;;) {
+ // run the loop body
+ const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
+ // lock, see if we're done
+ std::lock_guard<std::mutex> g(mu_);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ done_ = true;
+ }
+ if (done_) {
+ return;
+ }
+ // check if we're marking, swap out the histogram if so
+ if (new_) {
+ new_->Swap(&histogram_);
+ new_ = nullptr;
+ cv_.notify_one();
+ }
+ }
+ }
+
TestService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
@@ -240,6 +262,8 @@ class Client {
bool done_;
Histogram* new_;
Histogram histogram_;
+ Client* client_;
+ size_t idx_;
std::thread impl_;
};
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index e1e44f9ac0..a337610cbf 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -156,7 +156,7 @@ class AsyncClient : public Client {
std::function<ClientRpcContext*(int, TestService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
- channel_lock_(config.client_channels()),
+ channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()),
@@ -208,6 +208,7 @@ class AsyncClient : public Client {
delete ctx;
}
}
+ delete[] channel_lock_;
}
bool ThreadFunc(Histogram* histogram,
@@ -316,23 +317,28 @@ class AsyncClient : public Client {
}
private:
- class boolean { // exists only to avoid data-race on vector<bool>
+ class boolean { // exists only to avoid data-race on vector<bool>
public:
- boolean(): val_(false) {}
- boolean(bool b): val_(b) {}
- operator bool() const {return val_;}
- boolean& operator=(bool b) {val_=b; return *this;}
+ boolean() : val_(false) {}
+ boolean(bool b) : val_(b) {}
+ operator bool() const { return val_; }
+ boolean& operator=(bool b) {
+ val_ = b;
+ return *this;
+ }
+
private:
bool val_;
};
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
- std::vector<int> next_channel_; // per thread round-robin channel ctr
- std::vector<boolean> issue_allowed_; // may this thread attempt to issue
- std::vector<grpc_time> next_issue_; // when should it issue?
+ std::vector<int> next_channel_; // per thread round-robin channel ctr
+ std::vector<boolean> issue_allowed_; // may this thread attempt to issue
+ std::vector<grpc_time> next_issue_; // when should it issue?
- std::vector<std::mutex> channel_lock_;
+ std::mutex*
+ channel_lock_; // a vector, but avoid std::vector for old compilers
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
int channel_count_;
@@ -348,15 +354,17 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
+ StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& request, CompletionQueue* cq) {
+ return stub->AsyncUnaryCall(ctx, request, cq);
+ };
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
- auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
- const SimpleRequest& request, CompletionQueue* cq) {
- return stub->AsyncUnaryCall(ctx, request, cq);
- };
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- channel_id, stub, req, start_req, check_done);
+ channel_id, stub, req, AsyncUnaryClient::StartReq,
+ AsyncUnaryClient::CheckDone);
}
};
@@ -442,16 +450,19 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<
+ grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
+ StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ CompletionQueue* cq, void* tag) {
+ auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
+ return stream;
+ };
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
- auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
- CompletionQueue* cq, void* tag) {
- auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
- return stream;
- };
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- channel_id, stub, req, start_req, check_done);
+ channel_id, stub, req, AsyncStreamingClient::StartReq,
+ AsyncStreamingClient::CheckDone);
}
};
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 718698bfe1..db5416a707 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -45,8 +45,9 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/histogram.h>
-#include <grpc/support/log.h>
#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include <grpc++/server.h>
@@ -79,7 +80,9 @@ class SynchronousClient : public Client {
void WaitToIssue(int thread_idx) {
grpc_time next_time;
if (NextIssueTime(thread_idx, &next_time)) {
- std::this_thread::sleep_until(next_time);
+ gpr_timespec next_timespec;
+ TimepointHR2Timespec(next_time, &next_timespec);
+ gpr_sleep_until(next_timespec);
}
}
@@ -110,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config)
- : SynchronousClient(config),
- context_(num_threads_),
- stream_(num_threads_) {
+ : SynchronousClient(config) {
+ context_ = new grpc::ClientContext[num_threads_];
+ stream_ = new std::unique_ptr<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>[num_threads_];
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
@@ -121,12 +125,15 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
~SynchronousStreamingClient() {
EndThreads();
- for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
+ for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
+ stream++) {
if (*stream) {
(*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok());
}
}
+ delete[] stream_;
+ delete[] context_;
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
@@ -141,9 +148,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
private:
- std::vector<grpc::ClientContext> context_;
- std::vector<std::unique_ptr<
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_;
+ // These are both conceptually std::vector but cannot be for old compilers
+ // that expect contained classes to support copy constructors
+ grpc::ClientContext* context_;
+ std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>*
+ stream_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index a0360295e0..78e3720938 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -77,16 +77,34 @@ static deque<string> get_hosts(const string& name) {
}
}
+// Namespace for classes and functions used only in RunScenario
+// Using this rather than local definitions to workaround gcc-4.4 limitations
+// regarding using templates without linkage
+namespace runsc {
+
+// ClientContext allocator
+static ClientContext* AllocContext(list<ClientContext>* contexts) {
+ contexts->emplace_back();
+ return &contexts->back();
+}
+
+struct ServerData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
+};
+
+struct ClientData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
+};
+} // namespace runsc
+
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
int benchmark_seconds, int spawn_local_worker_count) {
- // ClientContext allocator (all are destroyed at scope exit)
+ // ClientContext allocations (all are destroyed at scope exit)
list<ClientContext> contexts;
- auto alloc_context = [&contexts]() {
- contexts.emplace_back();
- return &contexts.back();
- };
// To be added to the result, containing the final configuration used for
// client and config (incluiding host, etc.)
@@ -131,23 +149,22 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.resize(num_clients + num_servers);
// Start servers
- struct ServerData {
- unique_ptr<Worker::Stub> stub;
- unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
- };
- vector<ServerData> servers;
+ using runsc::ServerData;
+ // servers is array rather than std::vector to avoid gcc-4.4 issues
+ // where class contained in std::vector must have a copy constructor
+ auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
- ServerData sd;
- sd.stub = std::move(Worker::NewStub(
+ servers[i].stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
- sd.stream = std::move(sd.stub->RunServer(alloc_context()));
- GPR_ASSERT(sd.stream->Write(args));
+ servers[i].stream =
+ std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts)));
+ GPR_ASSERT(servers[i].stream->Write(args));
ServerStatus init_status;
- GPR_ASSERT(sd.stream->Read(&init_status));
+ GPR_ASSERT(servers[i].stream->Read(&init_status));
char* host;
char* driver_port;
char* cli_target;
@@ -157,30 +174,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_free(host);
gpr_free(driver_port);
gpr_free(cli_target);
-
- servers.push_back(std::move(sd));
}
// Start clients
- struct ClientData {
- unique_ptr<Worker::Stub> stub;
- unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
- };
- vector<ClientData> clients;
+ using runsc::ClientData;
+ // clients is array rather than std::vector to avoid gcc-4.4 issues
+ // where class contained in std::vector must have a copy constructor
+ auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
- ClientData cd;
- cd.stub = std::move(Worker::NewStub(CreateChannel(
+ clients[i].stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
- cd.stream = std::move(cd.stub->RunTest(alloc_context()));
- GPR_ASSERT(cd.stream->Write(args));
+ clients[i].stream =
+ std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts)));
+ GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
- GPR_ASSERT(cd.stream->Read(&init_status));
-
- clients.push_back(std::move(cd));
+ GPR_ASSERT(clients[i].stream->Read(&init_status));
}
// Let everything warmup
@@ -195,23 +207,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark();
ClientArgs client_mark;
client_mark.mutable_mark();
- for (auto server = servers.begin(); server != servers.end(); server++) {
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
- for (auto client = clients.begin(); client != clients.end(); client++) {
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
ServerStatus server_status;
ClientStatus client_status;
- for (auto server = servers.begin(); server != servers.end(); server++) {
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
}
- for (auto client = clients.begin(); client != clients.end(); client++) {
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
}
// Wait some time
gpr_log(GPR_INFO, "Running");
+ // Use gpr_sleep_until rather than this_thread::sleep_until to support
+ // compilers that don't work with this_thread
gpr_sleep_until(gpr_time_add(
start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
@@ -220,34 +234,36 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->client_config = result_client_config;
result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing");
- for (auto server = servers.begin(); server != servers.end(); server++) {
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
- for (auto client = clients.begin(); client != clients.end(); client++) {
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
- for (auto server = servers.begin(); server != servers.end(); server++) {
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
- result->server_resources.push_back(ResourceUsage{
- stats.time_elapsed(), stats.time_user(), stats.time_system()});
+ result->server_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system());
}
- for (auto client = clients.begin(); client != clients.end(); client++) {
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
- result->client_resources.push_back(ResourceUsage{
- stats.time_elapsed(), stats.time_user(), stats.time_system()});
+ result->client_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system());
}
- for (auto client = clients.begin(); client != clients.end(); client++) {
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok());
}
- for (auto server = servers.begin(); server != servers.end(); server++) {
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->WritesDone());
GPR_ASSERT(server->stream->Finish().ok());
}
+ delete[] clients;
+ delete[] servers;
return result;
}
} // namespace testing
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 5e9d4b3cb9..9a29df8d49 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -41,10 +41,18 @@
namespace grpc {
namespace testing {
-struct ResourceUsage {
- double wall_time;
- double user_time;
- double system_time;
+class ResourceUsage {
+ public:
+ ResourceUsage(double w, double u, double s)
+ : wall_time_(w), user_time_(u), system_time_(s) {}
+ double wall_time() const { return wall_time_; }
+ double user_time() const { return user_time_; }
+ double system_time() const { return system_time_; }
+
+ private:
+ double wall_time_;
+ double user_time_;
+ double system_time_;
};
struct ScenarioResult {
diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h
index f90a17a894..04d14f689f 100644
--- a/test/cpp/qps/interarrival.h
+++ b/test/cpp/qps/interarrival.h
@@ -36,7 +36,8 @@
#include <chrono>
#include <cmath>
-#include <random>
+#include <cstdlib>
+#include <vector>
#include <grpc++/config.h>
@@ -141,17 +142,16 @@ class ParetoDist GRPC_FINAL : public RandomDist {
// in an efficient re-entrant way. The random table is built at construction
// time, and each call must include the thread id of the invoker
-typedef std::default_random_engine qps_random_engine;
-
class InterarrivalTimer {
public:
InterarrivalTimer() {}
void init(const RandomDist& r, int threads, int entries = 1000000) {
- qps_random_engine gen;
- std::uniform_real_distribution<double> uniform(0.0, 1.0);
for (int i = 0; i < entries; i++) {
- random_table_.push_back(std::chrono::nanoseconds(
- static_cast<int64_t>(1e9 * r(uniform(gen)))));
+ // rand is the only choice that is portable across POSIX and Windows
+ // and that supports new and old compilers
+ const double uniform_0_1 = rand() / RAND_MAX;
+ random_table_.push_back(
+ std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1))));
}
// Now set up the thread positions
for (int i = 0; i < threads; i++) {
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index d534846365..b1463be8f6 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -33,6 +33,7 @@
#include <memory>
#include <set>
+#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index ff01ec1501..e03e8e1fb0 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -34,11 +34,16 @@
#include "test/cpp/qps/report.h"
#include <grpc/support/log.h>
+#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/stats.h"
namespace grpc {
namespace testing {
+static double WallTime(ResourceUsage u) { return u.wall_time(); }
+static double UserTime(ResourceUsage u) { return u.user_time(); }
+static double SystemTime(ResourceUsage u) { return u.system_time(); }
+
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
}
@@ -68,16 +73,14 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) {
}
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
- gpr_log(GPR_INFO, "QPS: %.1f",
- result.latencies.Count() /
- average(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; }));
+ gpr_log(
+ GPR_INFO, "QPS: %.1f",
+ result.latencies.Count() / average(result.client_resources, WallTime));
}
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
- auto qps = result.latencies.Count() /
- average(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; });
+ auto qps =
+ result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads());
@@ -95,40 +98,30 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) {
void GprLogReporter::ReportTimes(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Server system time: %.2f%%",
- 100.0 * sum(result.server_resources,
- [](ResourceUsage u) { return u.system_time; }) /
- sum(result.server_resources,
- [](ResourceUsage u) { return u.wall_time; }));
+ 100.0 * sum(result.server_resources, SystemTime) /
+ sum(result.server_resources, WallTime));
gpr_log(GPR_INFO, "Server user time: %.2f%%",
- 100.0 * sum(result.server_resources,
- [](ResourceUsage u) { return u.user_time; }) /
- sum(result.server_resources,
- [](ResourceUsage u) { return u.wall_time; }));
+ 100.0 * sum(result.server_resources, UserTime) /
+ sum(result.server_resources, WallTime));
gpr_log(GPR_INFO, "Client system time: %.2f%%",
- 100.0 * sum(result.client_resources,
- [](ResourceUsage u) { return u.system_time; }) /
- sum(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; }));
+ 100.0 * sum(result.client_resources, SystemTime) /
+ sum(result.client_resources, WallTime));
gpr_log(GPR_INFO, "Client user time: %.2f%%",
- 100.0 * sum(result.client_resources,
- [](ResourceUsage u) { return u.user_time; }) /
- sum(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; }));
+ 100.0 * sum(result.client_resources, UserTime) /
+ sum(result.client_resources, WallTime));
}
void PerfDbReporter::ReportQPS(const ScenarioResult& result) {
- auto qps = result.latencies.Count() /
- average(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; });
+ auto qps =
+ result.latencies.Count() / average(result.client_resources, WallTime);
perf_db_client_.setQps(qps);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
- auto qps = result.latencies.Count() /
- average(result.client_resources,
- [](ResourceUsage u) { return u.wall_time; });
+ auto qps =
+ result.latencies.Count() / average(result.client_resources, WallTime);
auto qpsPerCore = qps / result.server_config.threads();
@@ -139,33 +132,29 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
void PerfDbReporter::ReportLatency(const ScenarioResult& result) {
perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000,
- result.latencies.Percentile(90) / 1000,
- result.latencies.Percentile(95) / 1000,
- result.latencies.Percentile(99) / 1000,
- result.latencies.Percentile(99.9) / 1000);
+ result.latencies.Percentile(90) / 1000,
+ result.latencies.Percentile(95) / 1000,
+ result.latencies.Percentile(99) / 1000,
+ result.latencies.Percentile(99.9) / 1000);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
void PerfDbReporter::ReportTimes(const ScenarioResult& result) {
- double server_system_time =
- 100.0 * sum(result.server_resources,
- [](ResourceUsage u) { return u.system_time; }) /
- sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; });
- double server_user_time =
- 100.0 * sum(result.server_resources,
- [](ResourceUsage u) { return u.user_time; }) /
- sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; });
- double client_system_time =
- 100.0 * sum(result.client_resources,
- [](ResourceUsage u) { return u.system_time; }) /
- sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; });
- double client_user_time =
- 100.0 * sum(result.client_resources,
- [](ResourceUsage u) { return u.user_time; }) /
- sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; });
-
- perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time,
- client_user_time);
+ const double server_system_time = 100.0 *
+ sum(result.server_resources, SystemTime) /
+ sum(result.server_resources, WallTime);
+ const double server_user_time = 100.0 *
+ sum(result.server_resources, UserTime) /
+ sum(result.server_resources, WallTime);
+ const double client_system_time = 100.0 *
+ sum(result.client_resources, SystemTime) /
+ sum(result.client_resources, WallTime);
+ const double client_user_time = 100.0 *
+ sum(result.client_resources, UserTime) /
+ sum(result.client_resources, WallTime);
+
+ perf_db_client_.setTimes(server_system_time, server_user_time,
+ client_system_time, client_user_time);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 33b6fa55c3..b4fc49c31c 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -99,25 +99,7 @@ class AsyncQpsServerTest : public Server {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
- threads_.push_back(std::thread([=]() {
- // Wait until work is available or we are shutting down
- bool ok;
- void *got_tag;
- while (srv_cqs_[i]->Next(&got_tag, &ok)) {
- ServerRpcContext *ctx = detag(got_tag);
- // The tag is a pointer to an RPC context to invoke
- bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[i]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
- return;
- }
- }
- return;
- }));
+ threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
@@ -142,6 +124,26 @@ class AsyncQpsServerTest : public Server {
}
private:
+ void ThreadFunc(int rank) {
+ // Wait until work is available or we are shutting down
+ bool ok;
+ void *got_tag;
+ while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ ServerRpcContext *ctx = detag(got_tag);
+ // The tag is a pointer to an RPC context to invoke
+ const bool still_going = ctx->RunNextState(ok);
+ if (!shutdown_state_[rank]->shutdown()) {
+ // this RPC context is done, so refresh it
+ if (!still_going) {
+ ctx->Reset();
+ }
+ } else {
+ return;
+ }
+ }
+ return;
+ }
+
class ServerRpcContext {
public:
ServerRpcContext() {}