diff options
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 85 |
1 files changed, 52 insertions, 33 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 5d212f1acc..9f20b148eb 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,7 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override {} + + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -174,13 +195,7 @@ class SynchronousStreamingPingPongClient final grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { public: SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); - messages_issued_[thread_idx] = 0; - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingPingPongClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -196,7 +211,13 @@ class SynchronousStreamingPingPongClient final } } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + messages_issued_[thread_idx] = 0; + } + + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -228,14 +249,7 @@ class SynchronousStreamingFromClientClient final : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> { public: SynchronousStreamingFromClientClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_issue_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); - last_issue_[thread_idx] = UsageTimer::Now(); - } - } + : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -251,7 +265,14 @@ class SynchronousStreamingFromClientClient final } } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -279,15 +300,14 @@ class SynchronousStreamingFromServerClient final : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> { public: SynchronousStreamingFromServerClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_recv_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); - last_recv_[thread_idx] = UsageTimer::Now(); - } + : SynchronousStreamingClient(config), last_recv_(num_threads_) {} + void InitThreadFuncImpl(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -311,12 +331,7 @@ class SynchronousStreamingBothWaysClient final grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { public: SynchronousStreamingBothWaysClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -332,7 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } |