diff options
-rw-r--r-- | test/cpp/qps/client_async.cc | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f7cdfc2bd7..07888214e7 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,6 +236,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } + ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) { + ClientRpcContext* ctx = ClientRpcContext::detag(tag); + if (shutdown_state_[thread_idx]->shutdown) { + ctx->TryCancel(); + delete ctx; + bool ok; + while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) { + ctx = ClientRpcContext::detag(tag); + ctx->TryCancel(); + delete ctx; + } + return nullptr; + } + return ctx; + } + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; @@ -245,9 +261,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { return; } - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; shutdown_mu->lock(); + ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag); + if (ctx == nullptr) { + shutdown_mu->unlock(); + return; + } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( [&, ctx, ok, entry_ptr, shutdown_mu]() { if (!ctx->RunNextState(ok, entry_ptr)) { @@ -260,19 +280,9 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { }, &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { t->UpdateHistogram(entry_ptr); - // Got a regular event, so process it - ctx = ClientRpcContext::detag(got_tag); - // Proceed while holding a lock to make sure that - // this thread isn't supposed to shut down shutdown_mu->lock(); - if (shutdown_state_[thread_idx]->shutdown) { - ctx->TryCancel(); - delete ctx; - while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ctx = ClientRpcContext::detag(got_tag); - ctx->TryCancel(); - delete ctx; - } + ctx = ProcessTag(thread_idx, got_tag); + if (ctx == nullptr) { shutdown_mu->unlock(); return; } |