aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2017-11-17 11:35:05 -0800
committerGravatar GitHub <noreply@github.com>2017-11-17 11:35:05 -0800
commit510fcb84b63d1749c5c2b3bb9c1184302ee81272 (patch)
treeb9a983745667041aeab2ab1008a708b30e4a75ef
parent7ce4e1771048210a1342b7969801c8ecdd6a4259 (diff)
parent7cf8d72c25eab148b549b651396dcc9f12a01491 (diff)
Merge pull request #13409 from kpayson64/fix_json_run_localhost
Fix TSAN json_run_localhost flake
-rw-r--r--test/cpp/qps/client_async.cc36
1 files changed, 23 insertions, 13 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f7cdfc2bd7..07888214e7 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,6 +236,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
+ ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
+ ClientRpcContext* ctx = ClientRpcContext::detag(tag);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ ctx->TryCancel();
+ delete ctx;
+ bool ok;
+ while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
+ ctx = ClientRpcContext::detag(tag);
+ ctx->TryCancel();
+ delete ctx;
+ }
+ return nullptr;
+ }
+ return ctx;
+ }
+
void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
void* got_tag;
bool ok;
@@ -245,9 +261,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
return;
}
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
shutdown_mu->lock();
+ ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
+ if (ctx == nullptr) {
+ shutdown_mu->unlock();
+ return;
+ }
while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
[&, ctx, ok, entry_ptr, shutdown_mu]() {
if (!ctx->RunNextState(ok, entry_ptr)) {
@@ -260,19 +280,9 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
},
&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
t->UpdateHistogram(entry_ptr);
- // Got a regular event, so process it
- ctx = ClientRpcContext::detag(got_tag);
- // Proceed while holding a lock to make sure that
- // this thread isn't supposed to shut down
shutdown_mu->lock();
- if (shutdown_state_[thread_idx]->shutdown) {
- ctx->TryCancel();
- delete ctx;
- while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
- ctx = ClientRpcContext::detag(got_tag);
- ctx->TryCancel();
- delete ctx;
- }
+ ctx = ProcessTag(thread_idx, got_tag);
+ if (ctx == nullptr) {
shutdown_mu->unlock();
return;
}