diff options
author | kpayson64 <kpayson@google.com> | 2017-11-17 11:35:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-17 11:35:05 -0800 |
commit | 510fcb84b63d1749c5c2b3bb9c1184302ee81272 (patch) | |
tree | b9a983745667041aeab2ab1008a708b30e4a75ef /test | |
parent | 7ce4e1771048210a1342b7969801c8ecdd6a4259 (diff) | |
parent | 7cf8d72c25eab148b549b651396dcc9f12a01491 (diff) |
Merge pull request #13409 from kpayson64/fix_json_run_localhost
Fix TSAN json_run_localhost flake
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/client_async.cc | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f7cdfc2bd7..07888214e7 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,6 +236,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } + ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) { + ClientRpcContext* ctx = ClientRpcContext::detag(tag); + if (shutdown_state_[thread_idx]->shutdown) { + ctx->TryCancel(); + delete ctx; + bool ok; + while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) { + ctx = ClientRpcContext::detag(tag); + ctx->TryCancel(); + delete ctx; + } + return nullptr; + } + return ctx; + } + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; @@ -245,9 +261,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { return; } - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; shutdown_mu->lock(); + ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag); + if (ctx == nullptr) { + shutdown_mu->unlock(); + return; + } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( [&, ctx, ok, entry_ptr, shutdown_mu]() { if (!ctx->RunNextState(ok, entry_ptr)) { @@ -260,19 +280,9 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { }, &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { t->UpdateHistogram(entry_ptr); - // Got a regular event, so process it - ctx = ClientRpcContext::detag(got_tag); - // Proceed while holding a lock to make sure that - // this thread isn't supposed to shut down shutdown_mu->lock(); - if (shutdown_state_[thread_idx]->shutdown) { - ctx->TryCancel(); - delete ctx; - while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ctx = ClientRpcContext::detag(got_tag); - ctx->TryCancel(); - delete ctx; - } + ctx = ProcessTag(thread_idx, got_tag); + if (ctx == nullptr) { shutdown_mu->unlock(); return; } |