aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client.h30
-rw-r--r--test/cpp/qps/client_sync.cc4
-rw-r--r--test/cpp/qps/driver.cc34
3 files changed, 59 insertions, 9 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index fada4ba767..c8809cbc5b 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -129,13 +129,17 @@ class HistogramEntry GRPC_FINAL {
class Client {
public:
- Client() : timer_(new UsageTimer), interarrival_timer_() {}
+ Client() : timer_(new UsageTimer), interarrival_timer_() {
+ gpr_event_init(&start_requests_);
+ }
virtual ~Client() {}
ClientStats Mark(bool reset) {
Histogram latencies;
UsageTimer::Result timer_result;
+ MaybeStartRequests();
+
// avoid std::vector for old compilers that expect a copy constructor
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
@@ -189,7 +193,10 @@ class Client {
}
}
- void EndThreads() { threads_.clear(); }
+ void EndThreads() {
+ MaybeStartRequests();
+ threads_.clear();
+ }
virtual void DestroyMultithreading() = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
@@ -265,6 +272,13 @@ class Client {
Thread& operator=(const Thread&);
void ThreadFunc() {
+ while (!gpr_event_wait(
+ &client_->start_requests_,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
+ gpr_log(GPR_INFO, "Waiting for benchmark to start");
+ }
+
for (;;) {
// run the loop body
HistogramEntry entry;
@@ -302,6 +316,16 @@ class Client {
size_t threads_remaining_;
std::condition_variable threads_complete_;
+ gpr_event start_requests_;
+ bool started_requests_;
+
+ void MaybeStartRequests() {
+ if (!started_requests_) {
+ started_requests_ = true;
+ gpr_event_set(&start_requests_, (void*)1);
+ }
+ }
+
void CompleteThread() {
std::lock_guard<std::mutex> g(thread_completion_mu_);
threads_remaining_--;
@@ -359,7 +383,7 @@ class ClientImpl : public Client {
gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
GPR_ASSERT(channel_->WaitForConnected(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(30, GPR_TIMESPAN))));
+ gpr_time_from_seconds(300, GPR_TIMESPAN))));
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 8062424a1f..0ccf4e270b 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -130,6 +130,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
entry->set_value((UsageTimer::Now() - start) * 1e9);
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
return s.ok();
}
};
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index f67f353c4d..6965495206 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -45,6 +45,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/core/util/port.h"
@@ -366,12 +367,37 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!clients[i].stream->Write(args)) {
gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
}
+ }
+
+ for (size_t i = 0; i < num_clients; i++) {
ClientStatus init_status;
if (!clients[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
}
}
+ // Send an initial mark: clients can use this to know that everything is ready
+ // to start
+ gpr_log(GPR_INFO, "Initiating");
+ ServerArgs server_mark;
+ server_mark.mutable_mark()->set_reset(true);
+ ClientArgs client_mark;
+ client_mark.mutable_mark()->set_reset(true);
+ ServerStatus server_status;
+ ClientStatus client_status;
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
+ }
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Read(&client_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+ }
+ }
+
// Let everything warmup
gpr_log(GPR_INFO, "Warming up");
gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
@@ -380,10 +406,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start a run
gpr_log(GPR_INFO, "Starting");
- ServerArgs server_mark;
- server_mark.mutable_mark()->set_reset(true);
- ClientArgs client_mark;
- client_mark.mutable_mark()->set_reset(true);
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
@@ -396,8 +418,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
}
}
- ServerStatus server_status;
- ClientStatus client_status;
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Read(&server_status)) {
@@ -419,6 +439,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
start,
gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
+ gpr_timer_set_enabled(0);
+
// Finish a run
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
Histogram merged_latencies;