diff options
author | kpayson64 <kpayson@google.com> | 2017-10-27 14:38:13 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-27 14:38:13 -0700 |
commit | c4c535db59405b14328484eb346b1f61139d01dd (patch) | |
tree | 712f8271f899aee2e162124965bd5ac44cfc67ca | |
parent | 3f68e03c5a863721786c4fce7782c7e409d4722d (diff) | |
parent | 918ce7a686fee457d43820db980832e94b46d4a9 (diff) |
Merge pull request #13177 from kpayson64/qps_test_fix
Fix QPS Async Client Next loop
-rw-r--r-- | test/cpp/qps/client_async.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 3 |
2 files changed, 15 insertions, 16 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index b5c7208664..a541f94fa5 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { return; } - ClientRpcContext* ctx; + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; - do { + shutdown_mu->lock(); + while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + if (!ctx->RunNextState(ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { t->UpdateHistogram(entry_ptr); // Got a regular event, so process it ctx = ClientRpcContext::detag(got_tag); @@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { shutdown_mu->unlock(); return; } - } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( - [&, ctx, ok, entry_ptr, shutdown_mu]() { - bool next_ok = ok; - if (!ctx->RunNextState(next_ok, entry_ptr)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; - } - shutdown_mu->unlock(); - }, - &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); + } } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4cf80e9e3d..1c1a5636a9 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return; } ServerRpcContext *ctx; - std::mutex *mu_ptr; + std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex; do { ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - mu_ptr = &shutdown_state_[thread_idx]->mutex; mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { mu_ptr->unlock(); |