diff options
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 94554a46b2..9f20b148eb 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFunc(size_t thread_idx) override {} + void InitThreadFuncImpl(size_t thread_idx) override {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], &responses_[thread_idx]); last_issue_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromServer(&context_[thread_idx], request_); last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } |