aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2016-08-15 11:02:23 -0700
committerGravatar GitHub <noreply@github.com>2016-08-15 11:02:23 -0700
commit1748ba6d6d021c25c535d07e3c50b79d31642c10 (patch)
tree194df37b0765ff10a20a96b19edba99ba1994c93 /test
parent8aff34f50082874dae880c753b1bb800a005df43 (diff)
parentf50020ce038411b2a0864cb61296b67ac1cc032e (diff)
Merge pull request #7652 from vjpai/qps_better_client_ending
Better client-side ending for QPS tests
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client.h21
-rw-r--r--test/cpp/qps/client_sync.cc31
2 files changed, 34 insertions, 18 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 4045e13460..fada4ba767 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -169,6 +169,7 @@ class Client {
// Must call AwaitThreadsCompletion before destructor to avoid a race
// between destructor and invocation of virtual ThreadFunc
void AwaitThreadsCompletion() {
+ gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
DestroyMultithreading();
std::unique_lock<std::mutex> g(thread_completion_mu_);
while (threads_remaining_ != 0) {
@@ -178,8 +179,10 @@ class Client {
protected:
bool closed_loop_;
+ gpr_atm thread_pool_done_;
void StartThreads(size_t num_threads) {
+ gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
@@ -241,18 +244,9 @@ class Client {
class Thread {
public:
Thread(Client* client, size_t idx)
- : done_(false),
- client_(client),
- idx_(idx),
- impl_(&Thread::ThreadFunc, this) {}
+ : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
- ~Thread() {
- {
- std::lock_guard<std::mutex> g(mu_);
- done_ = true;
- }
- impl_.join();
- }
+ ~Thread() { impl_.join(); }
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
@@ -282,9 +276,9 @@ class Client {
}
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- done_ = true;
}
- if (done_) {
+ if (!thread_still_ok ||
+ static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
client_->CompleteThread();
return;
}
@@ -292,7 +286,6 @@ class Client {
}
std::mutex mu_;
- bool done_;
Histogram histogram_;
Client* client_;
const size_t idx_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 25c7823553..8062424a1f 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -79,10 +79,29 @@ class SynchronousClient
virtual ~SynchronousClient(){};
protected:
- void WaitToIssue(int thread_idx) {
+ // WaitToIssue returns false if we realize that we need to break out
+ bool WaitToIssue(int thread_idx) {
if (!closed_loop_) {
- gpr_sleep_until(NextIssueTime(thread_idx));
+ const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
+ // Avoid sleeping for too long continuously because we might
+ // need to terminate before then. This is an issue since
+ // exponential distribution can occasionally produce bad outliers
+ while (true) {
+ const gpr_timespec one_sec_delay =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(1, GPR_TIMESPAN));
+ if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
+ gpr_sleep_until(next_issue_time);
+ return true;
+ } else {
+ gpr_sleep_until(one_sec_delay);
+ if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
+ return false;
+ }
+ }
+ }
}
+ return true;
}
size_t num_threads_;
@@ -101,7 +120,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
~SynchronousUnaryClient() {}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
- WaitToIssue(thread_idx);
+ if (!WaitToIssue(thread_idx)) {
+ return true;
+ }
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
@@ -144,7 +165,9 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
- WaitToIssue(thread_idx);
+ if (!WaitToIssue(thread_idx)) {
+ return true;
+ }
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&