diff options
author | Soheil Hassas Yeganeh <soheil@google.com> | 2018-12-01 22:17:48 -0500 |
---|---|---|
committer | Soheil Hassas Yeganeh <soheil@google.com> | 2018-12-05 12:31:12 -0500 |
commit | be8ef52ea8a0bb91a83c41a8028bab69f42daac7 (patch) | |
tree | e07df5bfca5d224fde803117b44339080fc7ba82 /src/core/lib/iomgr/call_combiner.cc | |
parent | 2f55f4f85af9afe9d3f2e55b51ec949c87ed9b4d (diff) |
Add TSAN anntations for grpc_call_combiner.
Since GRPC_CLOSUSE_SCHEDULE can schedule callback asynchronously we have
to schedule our own wrapper instead. Also, we cannot use ACQUIRE and
RELEASE directly on the call_combiner, because callbacks are free to even
destroy the call_combiner. Thus, we use a ref-counted structure that
acts as a fake lock for Tsan annotations.
Diffstat (limited to 'src/core/lib/iomgr/call_combiner.cc')
-rw-r--r-- | src/core/lib/iomgr/call_combiner.cc | 52 |
1 files changed, 50 insertions, 2 deletions
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index 90dda45ba3..6b5759a036 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -39,10 +39,57 @@ static gpr_atm encode_cancel_state_error(grpc_error* error) { return static_cast<gpr_atm>(1) | (gpr_atm)error; } +#ifdef GRPC_TSAN_ENABLED +static void tsan_closure(void* user_data, grpc_error* error) { + grpc_call_combiner* call_combiner = + static_cast<grpc_call_combiner*>(user_data); + // We ref-count the lock, and check if it's already taken. + // If it was taken, we should do nothing. Otherwise, we will mark it as + // locked. Note that if two different threads try to do this, only one of + // them will be able to mark the lock as acquired, while they both run their + // callbacks. In such cases (which should never happen for call_combiner), + // TSAN will correctly produce an error. + // + // TODO(soheil): This only covers the callbacks scheduled by + // grpc_call_combiner_(start|finish). If in the future, a + // callback gets scheduled using other mechanisms, we will need + // to add APIs to externally lock call combiners. + grpc_core::RefCountedPtr<grpc_call_combiner::TsanLock> lock = + call_combiner->tsan_lock; + bool prev = false; + if (lock->taken.compare_exchange_strong(prev, true)) { + TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true); + } else { + lock.reset(); + } + GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error)); + if (lock != nullptr) { + TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true); + bool prev = true; + GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false)); + } +} +#endif + +static void call_combiner_sched_closure(grpc_call_combiner* call_combiner, + grpc_closure* closure, + grpc_error* error) { +#ifdef GRPC_TSAN_ENABLED + call_combiner->original_closure = closure; + GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error); +#else + GRPC_CLOSURE_SCHED(closure, error); +#endif +} + void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0); gpr_atm_no_barrier_store(&call_combiner->size, 0); gpr_mpscq_init(&call_combiner->queue); +#ifdef GRPC_TSAN_ENABLED + GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner, + grpc_schedule_on_exec_ctx); +#endif } void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { @@ -87,7 +134,7 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner, gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY"); } // Queue was empty, so execute this closure immediately. - GRPC_CLOSURE_SCHED(closure, error); + call_combiner_sched_closure(call_combiner, closure, error); } else { if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " QUEUING"); @@ -134,7 +181,8 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s", closure, grpc_error_string(closure->error_data.error)); } - GRPC_CLOSURE_SCHED(closure, closure->error_data.error); + call_combiner_sched_closure(call_combiner, closure, + closure->error_data.error); break; } } else if (grpc_call_combiner_trace.enabled()) { |