diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f5807da81e..a541f94fa5 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,31 +236,46 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; - if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + HistogramEntry entry; + HistogramEntry* entry_ptr = &entry; + if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; + shutdown_mu->lock(); + while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + if (!ctx->RunNextState(ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { + t->UpdateHistogram(entry_ptr); // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + shutdown_mu->lock(); if (shutdown_state_[thread_idx]->shutdown) { ctx->TryCancel(); delete ctx; - return true; - } - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; + while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + ctx = ClientRpcContext::detag(got_tag); + ctx->TryCancel(); + delete ctx; + } + shutdown_mu->unlock(); + return; } - return true; - } else { - // queue is shutting down, so we must be done - return true; } } |