aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--test/cpp/qps/client_async.cc43
-rw-r--r--test/cpp/qps/qps_worker.cc2
2 files changed, 32 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 963a1e1cd0..057e5a0d6b 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -189,14 +189,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
virtual ~AsyncClient() {
- for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
- void* got_tag;
- bool ok;
- while ((*cq)->Next(&got_tag, &ok)) {
- delete ClientRpcContext::detag(got_tag);
- }
- }
+ FinalShutdownCQs();
}
bool ThreadFunc(HistogramEntry* entry,
@@ -216,14 +209,29 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
delete ctx;
}
return true;
- } else { // queue is shutting down
- return false;
+ } else { // queue is shutting down, so we must be done
+ return true;
}
}
protected:
const int num_async_threads_;
+ void ShutdownCQs() {
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ (*cq)->Shutdown();
+ }
+ }
+ void FinalShutdownCQs() {
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ void* got_tag;
+ bool ok;
+ while ((*cq)->Next(&got_tag, &ok)) {
+ delete ClientRpcContext::detag(got_tag);
+ }
+ }
+ }
+
private:
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
@@ -251,7 +259,10 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncUnaryClient() GRPC_OVERRIDE {
+ ShutdownCQs();
+ EndThreads();
+ }
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -380,7 +391,10 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncStreamingClient() GRPC_OVERRIDE {
+ ShutdownCQs();
+ EndThreads();
+ }
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -516,7 +530,10 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~GenericAsyncStreamingClient() GRPC_OVERRIDE {
+ ShutdownCQs();
+ EndThreads();
+ }
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 8456fde0ed..49ef52895c 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -128,6 +128,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ScopedProfile profile("qps_client.prof", false);
Status ret = RunClientBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunClient: Returning");
return ret;
}
@@ -141,6 +142,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ScopedProfile profile("qps_server.prof", false);
Status ret = RunServerBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunServer: Returning");
return ret;
}