diff options
author | 2017-10-20 10:32:30 -0700 | |
---|---|---|
committer | 2017-10-25 09:13:30 -0700 | |
commit | 42bd87e376913939850bfa78a3c7f96ce83af11e (patch) | |
tree | be63963cc76a3f293c1bcb8ca1b57b56d21a8e4e /test/cpp/qps/server_async.cc | |
parent | 0d1150855d5c812d649111a4675ad0c444dafdc4 (diff) |
Adds gRPC Experimental CQ DoThenAsyncNext lambda API
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4a82f98199..c1097cb8ee 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -194,23 +194,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + mu_ptr = &shutdown_state_[thread_idx]->mutex; + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard<ServerRpcContext> l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { |