diff options
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 25c7823553..8062424a1f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -79,10 +79,29 @@ class SynchronousClient virtual ~SynchronousClient(){}; protected: - void WaitToIssue(int thread_idx) { + // WaitToIssue returns false if we realize that we need to break out + bool WaitToIssue(int thread_idx) { if (!closed_loop_) { - gpr_sleep_until(NextIssueTime(thread_idx)); + const gpr_timespec next_issue_time = NextIssueTime(thread_idx); + // Avoid sleeping for too long continuously because we might + // need to terminate before then. This is an issue since + // exponential distribution can occasionally produce bad outliers + while (true) { + const gpr_timespec one_sec_delay = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN)); + if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) { + gpr_sleep_until(next_issue_time); + return true; + } else { + gpr_sleep_until(one_sec_delay); + if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) { + return false; + } + } + } } + return true; } size_t num_threads_; @@ -101,7 +120,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0); @@ -144,7 +165,9 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); + if (!WaitToIssue(thread_idx)) { + return true; + } GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && |