diff options
author | 2016-08-15 11:02:23 -0700 | |
---|---|---|
committer | 2016-08-15 11:02:23 -0700 | |
commit | 1748ba6d6d021c25c535d07e3c50b79d31642c10 (patch) | |
tree | 194df37b0765ff10a20a96b19edba99ba1994c93 /test | |
parent | 8aff34f50082874dae880c753b1bb800a005df43 (diff) | |
parent | f50020ce038411b2a0864cb61296b67ac1cc032e (diff) |
Merge pull request #7652 from vjpai/qps_better_client_ending
Better client-side ending for QPS tests
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/client.h | 21 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 31 |
2 files changed, 34 insertions, 18 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 4045e13460..fada4ba767 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -169,6 +169,7 @@ class Client { // Must call AwaitThreadsCompletion before destructor to avoid a race // between destructor and invocation of virtual ThreadFunc void AwaitThreadsCompletion() { + gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true)); DestroyMultithreading(); std::unique_lock<std::mutex> g(thread_completion_mu_); while (threads_remaining_ != 0) { @@ -178,8 +179,10 @@ class Client { protected: bool closed_loop_; + gpr_atm thread_pool_done_; void StartThreads(size_t num_threads) { + gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false)); threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -241,18 +244,9 @@ class Client { class Thread { public: Thread(Client* client, size_t idx) - : done_(false), - client_(client), - idx_(idx), - impl_(&Thread::ThreadFunc, this) {} + : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} - ~Thread() { - { - std::lock_guard<std::mutex> g(mu_); - done_ = true; - } - impl_.join(); - } + ~Thread() { impl_.join(); } void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); @@ -282,9 +276,9 @@ class Client { } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - done_ = true; } - if (done_) { + if (!thread_still_ok || + static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) { client_->CompleteThread(); return; } @@ -292,7 +286,6 @@ class Client { } std::mutex mu_; - bool done_; Histogram histogram_; Client* client_; const size_t idx_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 25c7823553..8062424a1f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -79,10 +79,29 @@ class SynchronousClient virtual ~SynchronousClient(){}; protected: - void WaitToIssue(int thread_idx) { + // WaitToIssue returns false if we realize that we need to break out + bool WaitToIssue(int thread_idx) { if (!closed_loop_) { - gpr_sleep_until(NextIssueTime(thread_idx)); + const gpr_timespec next_issue_time = NextIssueTime(thread_idx); + // Avoid sleeping for too long continuously because we might + // need to terminate before then. This is an issue since + // exponential distribution can occasionally produce bad outliers + while (true) { + const gpr_timespec one_sec_delay = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN)); + if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) { + gpr_sleep_until(next_issue_time); + return true; + } else { + gpr_sleep_until(one_sec_delay); + if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) { + return false; + } + } + } } + return true; } size_t num_threads_; @@ -101,7 +120,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0); @@ -144,7 +165,9 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && |