diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/client.h | 32 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 8 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 7 |
3 files changed, 29 insertions, 18 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index cae7f44537..6e7f63e444 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -104,7 +104,7 @@ class Client { void EndThreads() { threads_.clear(); } - virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; private: class Thread { @@ -113,20 +113,24 @@ class Client { : done_(false), new_(nullptr), impl_([this, idx, client]() { - for (;;) { - // run the loop body - client->ThreadFunc(&histogram_, idx); - // lock, see if we're done - std::lock_guard<std::mutex> g(mu_); - if (done_) {return;} - // check if we're marking, swap out the histogram if so - if (new_) { - new_->Swap(&histogram_); - new_ = nullptr; - cv_.notify_one(); + for (;;) { + // run the loop body + bool thread_still_ok = client->ThreadFunc(&histogram_, idx); + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + done_ = true; + } + if (done_) {return;} + // check if we're marking, swap out the histogram if so + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } } - } - }) {} + }) {} ~Thread() { { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index b07620140e..fecff5c8ce 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -173,7 +173,7 @@ class AsyncUnaryClient GRPC_FINAL : public Client { } } - void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { void* got_tag; bool ok; cli_cqs_[thread_idx]->Next(&got_tag, &ok); @@ -185,6 +185,8 @@ class AsyncUnaryClient GRPC_FINAL : public Client { ctx->StartNewClone(); delete ctx; } + + return true; } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; @@ -301,7 +303,7 @@ class AsyncStreamingClient GRPC_FINAL : public Client { } } - void ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE { void *got_tag; bool ok; cli_cqs_[thread_idx]->Next(&got_tag, &ok); @@ -313,6 +315,8 @@ class AsyncStreamingClient GRPC_FINAL : public Client { ctx->StartNewClone(); delete ctx; } + + return true; } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 46eb3b8d74..0c8b4f090f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -83,13 +83,14 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { SynchronousClient(config) {StartThreads(num_threads_);} ~SynchronousUnaryClient() {} - void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); histogram->Add((Timer::Now() - start) * 1e9); + return s.IsOk(); } }; @@ -111,11 +112,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } } - void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { double start = Timer::Now(); if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) { histogram->Add((Timer::Now() - start) * 1e9); + return true; } + return false; } private: grpc::ClientContext context_; |