aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_async.cc
diff options
context:
space:
mode:
authorGravatar Ken Payson <kpayson@google.com>2017-10-20 10:32:30 -0700
committerGravatar Ken Payson <kpayson@google.com>2017-10-25 09:13:30 -0700
commit42bd87e376913939850bfa78a3c7f96ce83af11e (patch)
treebe63963cc76a3f293c1bcb8ca1b57b56d21a8e4e /test/cpp/qps/client_async.cc
parent0d1150855d5c812d649111a4675ad0c444dafdc4 (diff)
Adds gRPC Experimental CQ DoThenAsyncNext lambda API
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r--test/cpp/qps/client_async.cc48
1 files changed, 31 insertions, 17 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed4e0b355..b5c7208664 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
- void InitThreadFunc(size_t thread_idx) override final {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
+ void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
void* got_tag;
bool ok;
- if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ HistogramEntry entry;
+ HistogramEntry* entry_ptr = &entry;
+ if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ClientRpcContext* ctx;
+ std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
+ do {
+ t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ shutdown_mu->lock();
if (shutdown_state_[thread_idx]->shutdown) {
ctx->TryCancel();
delete ctx;
- return true;
- }
- if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
+ while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ ctx = ClientRpcContext::detag(got_tag);
+ ctx->TryCancel();
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ return;
}
- return true;
- } else {
- // queue is shutting down, so we must be done
- return true;
- }
+ } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ bool next_ok = ok;
+ if (!ctx->RunNextState(next_ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;