aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client.h32
-rw-r--r--test/cpp/qps/client_async.cc8
-rw-r--r--test/cpp/qps/client_sync.cc7
3 files changed, 29 insertions, 18 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index cae7f44537..6e7f63e444 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -104,7 +104,7 @@ class Client {
void EndThreads() { threads_.clear(); }
- virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
private:
class Thread {
@@ -113,20 +113,24 @@ class Client {
: done_(false),
new_(nullptr),
impl_([this, idx, client]() {
- for (;;) {
- // run the loop body
- client->ThreadFunc(&histogram_, idx);
- // lock, see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (done_) {return;}
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
- cv_.notify_one();
+ for (;;) {
+ // run the loop body
+ bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
+ // lock, see if we're done
+ std::lock_guard<std::mutex> g(mu_);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ done_ = true;
+ }
+ if (done_) {return;}
+ // check if we're marking, swap out the histogram if so
+ if (new_) {
+ new_->Swap(&histogram_);
+ new_ = nullptr;
+ cv_.notify_one();
+ }
}
- }
- }) {}
+ }) {}
~Thread() {
{
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index b07620140e..fecff5c8ce 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -173,7 +173,7 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
}
}
- void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
void* got_tag;
bool ok;
cli_cqs_[thread_idx]->Next(&got_tag, &ok);
@@ -185,6 +185,8 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
ctx->StartNewClone();
delete ctx;
}
+
+ return true;
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
@@ -301,7 +303,7 @@ class AsyncStreamingClient GRPC_FINAL : public Client {
}
}
- void ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE {
void *got_tag;
bool ok;
cli_cqs_[thread_idx]->Next(&got_tag, &ok);
@@ -313,6 +315,8 @@ class AsyncStreamingClient GRPC_FINAL : public Client {
ctx->StartNewClone();
delete ctx;
}
+
+ return true;
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 46eb3b8d74..0c8b4f090f 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -83,13 +83,14 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
SynchronousClient(config) {StartThreads(num_threads_);}
~SynchronousUnaryClient() {}
- void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
histogram->Add((Timer::Now() - start) * 1e9);
+ return s.IsOk();
}
};
@@ -111,11 +112,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
}
- void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
double start = Timer::Now();
if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9);
+ return true;
}
+ return false;
}
private:
grpc::ClientContext context_;