aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--test/cpp/qps/client_async.cc33
1 files changed, 20 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 1507d1e3d6..24bc0eb5f4 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -206,21 +206,28 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (!ctx->RunNextState(ok, histogram)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- auto clone = ctx->StartNewClone();
- clone->Start(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
+ switch (cli_cqs_[thread_idx]->AsyncNext(
+ &got_tag, &ok,
+ std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
+ case CompletionQueue::SHUTDOWN:
+ return false;
+ case CompletionQueue::GOT_EVENT: {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ if (!ctx->RunNextState(ok, histogram)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ auto clone = ctx->StartNewClone();
+ clone->Start(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
+ }
+ return true;
}
- return true;
- } else { // queue is shutting down
- return false;
+ case CompletionQueue::TIMEOUT:
+ return true;
}
+ GPR_UNREACHABLE_CODE(return false);
}
protected: