diff options
author | 2017-10-25 10:28:02 -0700 | |
---|---|---|
committer | 2017-10-25 10:28:02 -0700 | |
commit | 1bf7207852b4138c8a30e5a2f8f2c4bfffbba262 (patch) | |
tree | a49ce2246b55aa230802c909898a20d136f3561d /test/cpp/qps/server_async.cc | |
parent | 1bda5106421c5e6f449e6dbdd2214cdf31b26fcf (diff) | |
parent | 42bd87e376913939850bfa78a3c7f96ce83af11e (diff) |
Merge pull request #13084 from kpayson64/cq_lambda
CompletionQueue DoThenAsyncNext
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 776371a2c6..4576be5bb3 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -202,23 +202,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + mu_ptr = &shutdown_state_[thread_idx]->mutex; + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard<ServerRpcContext> l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { |