aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2017-10-27 14:38:13 -0700
committerGravatar GitHub <noreply@github.com>2017-10-27 14:38:13 -0700
commitc4c535db59405b14328484eb346b1f61139d01dd (patch)
tree712f8271f899aee2e162124965bd5ac44cfc67ca
parent3f68e03c5a863721786c4fce7782c7e409d4722d (diff)
parent918ce7a686fee457d43820db980832e94b46d4a9 (diff)
Merge pull request #13177 from kpayson64/qps_test_fix
Fix QPS Async Client Next loop
-rw-r--r--test/cpp/qps/client_async.cc28
-rw-r--r--test/cpp/qps/server_async.cc3
2 files changed, 15 insertions, 16 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index b5c7208664..a541f94fa5 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
return;
}
- ClientRpcContext* ctx;
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
- do {
+ shutdown_mu->lock();
+ while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ if (!ctx->RunNextState(ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
ctx = ClientRpcContext::detag(got_tag);
@@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
shutdown_mu->unlock();
return;
}
- } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
- [&, ctx, ok, entry_ptr, shutdown_mu]() {
- bool next_ok = ok;
- if (!ctx->RunNextState(next_ok, entry_ptr)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
- }
- shutdown_mu->unlock();
- },
- &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
+ }
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4cf80e9e3d..1c1a5636a9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
return;
}
ServerRpcContext *ctx;
- std::mutex *mu_ptr;
+ std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex;
do {
ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- mu_ptr = &shutdown_state_[thread_idx]->mutex;
mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
mu_ptr->unlock();