diff options
-rw-r--r-- | test/cpp/qps/client_async.cc | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..24bc0eb5f4 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -206,21 +206,28 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { void* got_tag; bool ok; - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { + case CompletionQueue::SHUTDOWN: + return false; + case CompletionQueue::GOT_EVENT: { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (!ctx->RunNextState(ok, histogram)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; + } + return true; } - return true; - } else { // queue is shutting down - return false; + case CompletionQueue::TIMEOUT: + return true; } + GPR_UNREACHABLE_CODE(return false); } protected: |