aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD7
-rw-r--r--build.yaml1
-rw-r--r--gRPC-C++.podspec2
-rw-r--r--gRPC-Core.podspec2
-rw-r--r--grpc.gemspec1
-rw-r--r--include/grpc/impl/codegen/port_platform.h9
-rw-r--r--package.xml1
-rw-r--r--src/core/lib/iomgr/call_combiner.cc52
-rw-r--r--src/core/lib/iomgr/call_combiner.h31
-rw-r--r--src/core/lib/iomgr/dynamic_annotations.h67
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rw-r--r--tools/run_tests/generated/sources_and_headers.json2
13 files changed, 170 insertions, 7 deletions
diff --git a/BUILD b/BUILD
index 9a2c16c601..9e3e594038 100644
--- a/BUILD
+++ b/BUILD
@@ -856,6 +856,7 @@ grpc_cc_library(
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
+ "src/core/lib/iomgr/dynamic_annotations.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -1088,11 +1089,11 @@ grpc_cc_library(
"grpc_base",
"grpc_client_authority_filter",
"grpc_deadline_filter",
+ "health_proto",
"inlined_vector",
"orphanable",
"ref_counted",
"ref_counted_ptr",
- "health_proto",
],
)
@@ -1590,8 +1591,8 @@ grpc_cc_library(
"src/core/lib/security/security_connector/load_system_roots_linux.cc",
"src/core/lib/security/security_connector/local/local_security_connector.cc",
"src/core/lib/security/security_connector/security_connector.cc",
- "src/core/lib/security/security_connector/ssl_utils.cc",
"src/core/lib/security/security_connector/ssl/ssl_security_connector.cc",
+ "src/core/lib/security/security_connector/ssl_utils.cc",
"src/core/lib/security/transport/client_auth_filter.cc",
"src/core/lib/security/transport/secure_endpoint.cc",
"src/core/lib/security/transport/security_handshaker.cc",
@@ -1624,8 +1625,8 @@ grpc_cc_library(
"src/core/lib/security/security_connector/load_system_roots_linux.h",
"src/core/lib/security/security_connector/local/local_security_connector.h",
"src/core/lib/security/security_connector/security_connector.h",
- "src/core/lib/security/security_connector/ssl_utils.h",
"src/core/lib/security/security_connector/ssl/ssl_security_connector.h",
+ "src/core/lib/security/security_connector/ssl_utils.h",
"src/core/lib/security/transport/auth_filters.h",
"src/core/lib/security/transport/secure_endpoint.h",
"src/core/lib/security/transport/security_handshaker.h",
diff --git a/build.yaml b/build.yaml
index 41c63d133a..1e63933f55 100644
--- a/build.yaml
+++ b/build.yaml
@@ -440,6 +440,7 @@ filegroups:
- src/core/lib/iomgr/call_combiner.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
+ - src/core/lib/iomgr/dynamic_annotations.h
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_pair.h
- src/core/lib/iomgr/error.h
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 6647201714..5e79803497 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -405,6 +405,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
+ 'src/core/lib/iomgr/dynamic_annotations.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
@@ -597,6 +598,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
+ 'src/core/lib/iomgr/dynamic_annotations.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 3ec0852ad3..1d4e1ae35c 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -403,6 +403,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
+ 'src/core/lib/iomgr/dynamic_annotations.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
@@ -1022,6 +1023,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
+ 'src/core/lib/iomgr/dynamic_annotations.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index b5a1ef43fd..92b1e0be68 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -339,6 +339,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/call_combiner.h )
s.files += %w( src/core/lib/iomgr/closure.h )
s.files += %w( src/core/lib/iomgr/combiner.h )
+ s.files += %w( src/core/lib/iomgr/dynamic_annotations.h )
s.files += %w( src/core/lib/iomgr/endpoint.h )
s.files += %w( src/core/lib/iomgr/endpoint_pair.h )
s.files += %w( src/core/lib/iomgr/error.h )
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index b2028a6305..031c0c36ae 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -526,6 +526,15 @@ typedef unsigned __int64 uint64_t;
#endif /* GPR_ATTRIBUTE_NO_TSAN (2) */
#endif /* GPR_ATTRIBUTE_NO_TSAN (1) */
+/* GRPC_TSAN_ENABLED will be defined, when compiled with thread sanitizer. */
+#if defined(__SANITIZE_THREAD__)
+#define GRPC_TSAN_ENABLED
+#elif defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#define GRPC_TSAN_ENABLED
+#endif
+#endif
+
/* GRPC_ALLOW_EXCEPTIONS should be 0 or 1 if exceptions are allowed or not */
#ifndef GRPC_ALLOW_EXCEPTIONS
/* If not already set, set to 1 on Windows (style guide standard) but to
diff --git a/package.xml b/package.xml
index a354ad5fa7..bdcb12bfc5 100644
--- a/package.xml
+++ b/package.xml
@@ -344,6 +344,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/call_combiner.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/combiner.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/dynamic_annotations.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error.h" role="src" />
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index 90dda45ba3..6b5759a036 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -39,10 +39,57 @@ static gpr_atm encode_cancel_state_error(grpc_error* error) {
return static_cast<gpr_atm>(1) | (gpr_atm)error;
}
+#ifdef GRPC_TSAN_ENABLED
+static void tsan_closure(void* user_data, grpc_error* error) {
+ grpc_call_combiner* call_combiner =
+ static_cast<grpc_call_combiner*>(user_data);
+ // We ref-count the lock, and check if it's already taken.
+ // If it was taken, we should do nothing. Otherwise, we will mark it as
+ // locked. Note that if two different threads try to do this, only one of
+ // them will be able to mark the lock as acquired, while they both run their
+ // callbacks. In such cases (which should never happen for call_combiner),
+ // TSAN will correctly produce an error.
+ //
+ // TODO(soheil): This only covers the callbacks scheduled by
+ // grpc_call_combiner_(start|finish). If in the future, a
+ // callback gets scheduled using other mechanisms, we will need
+ // to add APIs to externally lock call combiners.
+ grpc_core::RefCountedPtr<grpc_call_combiner::TsanLock> lock =
+ call_combiner->tsan_lock;
+ bool prev = false;
+ if (lock->taken.compare_exchange_strong(prev, true)) {
+ TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
+ } else {
+ lock.reset();
+ }
+ GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error));
+ if (lock != nullptr) {
+ TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
+ bool prev = true;
+ GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
+ }
+}
+#endif
+
+static void call_combiner_sched_closure(grpc_call_combiner* call_combiner,
+ grpc_closure* closure,
+ grpc_error* error) {
+#ifdef GRPC_TSAN_ENABLED
+ call_combiner->original_closure = closure;
+ GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error);
+#else
+ GRPC_CLOSURE_SCHED(closure, error);
+#endif
+}
+
void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0);
gpr_atm_no_barrier_store(&call_combiner->size, 0);
gpr_mpscq_init(&call_combiner->queue);
+#ifdef GRPC_TSAN_ENABLED
+ GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner,
+ grpc_schedule_on_exec_ctx);
+#endif
}
void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
@@ -87,7 +134,7 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
}
// Queue was empty, so execute this closure immediately.
- GRPC_CLOSURE_SCHED(closure, error);
+ call_combiner_sched_closure(call_combiner, closure, error);
} else {
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " QUEUING");
@@ -134,7 +181,8 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
closure, grpc_error_string(closure->error_data.error));
}
- GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
+ call_combiner_sched_closure(call_combiner, closure,
+ closure->error_data.error);
break;
}
} else if (grpc_call_combiner_trace.enabled()) {
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index c943fb1557..4ec0044f05 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -27,7 +27,10 @@
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/dynamic_annotations.h"
// A simple, lock-free mechanism for serializing activity related to a
// single call. This is similar to a combiner but is more lightweight.
@@ -40,14 +43,38 @@
extern grpc_core::TraceFlag grpc_call_combiner_trace;
-typedef struct {
+struct grpc_call_combiner {
gpr_atm size = 0; // size_t, num closures in queue or currently executing
gpr_mpscq queue;
// Either 0 (if not cancelled and no cancellation closure set),
// a grpc_closure* (if the lowest bit is 0),
// or a grpc_error* (if the lowest bit is 1).
gpr_atm cancel_state = 0;
-} grpc_call_combiner;
+#ifdef GRPC_TSAN_ENABLED
+ // A fake ref-counted lock that is kept alive after the destruction of
+ // grpc_call_combiner, when we are running the original closure.
+ //
+ // Ideally we want to lock and unlock the call combiner as a pointer, when the
+ // callback is called. However, original_closure is free to trigger
+ // anything on the call combiner (including destruction of grpc_call).
+ // Thus, we need a ref-counted structure that can outlive the call combiner.
+ struct TsanLock
+ : public grpc_core::RefCounted<TsanLock,
+ grpc_core::NonPolymorphicRefCount> {
+ TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
+ ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
+
+ // To avoid double-locking by the same thread, we should acquire/release
+ // the lock only when taken is false. On each acquire taken must be set to
+ // true.
+ std::atomic<bool> taken{false};
+ };
+ grpc_core::RefCountedPtr<TsanLock> tsan_lock =
+ grpc_core::MakeRefCounted<TsanLock>();
+ grpc_closure tsan_closure;
+ grpc_closure* original_closure;
+#endif
+};
// Assumes memory was initialized to zero.
void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
diff --git a/src/core/lib/iomgr/dynamic_annotations.h b/src/core/lib/iomgr/dynamic_annotations.h
new file mode 100644
index 0000000000..713928023a
--- /dev/null
+++ b/src/core/lib/iomgr/dynamic_annotations.h
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H
+#define GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_TSAN_ENABLED
+
+#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr) \
+ AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr))
+#define TSAN_ANNOTATE_HAPPENS_AFTER(addr) \
+ AnnotateHappensAfter(__FILE__, __LINE__, (void*)(addr))
+#define TSAN_ANNOTATE_RWLOCK_CREATE(addr) \
+ AnnotateRWLockCreate(__FILE__, __LINE__, (void*)(addr))
+#define TSAN_ANNOTATE_RWLOCK_DESTROY(addr) \
+ AnnotateRWLockDestroy(__FILE__, __LINE__, (void*)(addr))
+#define TSAN_ANNOTATE_RWLOCK_ACQUIRED(addr, is_w) \
+ AnnotateRWLockAcquired(__FILE__, __LINE__, (void*)(addr), (is_w))
+#define TSAN_ANNOTATE_RWLOCK_RELEASED(addr, is_w) \
+ AnnotateRWLockReleased(__FILE__, __LINE__, (void*)(addr), (is_w))
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+void AnnotateHappensBefore(const char* file, int line, const volatile void* cv);
+void AnnotateHappensAfter(const char* file, int line, const volatile void* cv);
+void AnnotateRWLockCreate(const char* file, int line,
+ const volatile void* lock);
+void AnnotateRWLockDestroy(const char* file, int line,
+ const volatile void* lock);
+void AnnotateRWLockAcquired(const char* file, int line,
+ const volatile void* lock, long is_w);
+void AnnotateRWLockReleased(const char* file, int line,
+ const volatile void* lock, long is_w);
+#ifdef __cplusplus
+}
+#endif
+
+#else /* GRPC_TSAN_ENABLED */
+
+#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr)
+#define TSAN_ANNOTATE_HAPPENS_AFTER(addr)
+#define TSAN_ANNOTATE_RWLOCK_CREATE(addr)
+#define TSAN_ANNOTATE_RWLOCK_DESTROY(addr)
+#define TSAN_ANNOTATE_RWLOCK_ACQUIRED(addr, is_w)
+#define TSAN_ANNOTATE_RWLOCK_RELEASED(addr, is_w)
+
+#endif /* GRPC_TSAN_ENABLED */
+
+#endif /* GRPC_CORE_LIB_IOMGR_DYNAMIC_ANNOTATIONS_H */
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 72b247c48d..0e65bf6daa 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1084,6 +1084,7 @@ src/core/lib/iomgr/buffer_list.h \
src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/combiner.h \
+src/core/lib/iomgr/dynamic_annotations.h \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
src/core/lib/iomgr/error.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 0f1943de25..dd5bead58c 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1182,6 +1182,7 @@ src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/combiner.h \
+src/core/lib/iomgr/dynamic_annotations.h \
src/core/lib/iomgr/endpoint.cc \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index d4d5d14f07..a7231554e3 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -9738,6 +9738,7 @@
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
+ "src/core/lib/iomgr/dynamic_annotations.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -9890,6 +9891,7 @@
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
+ "src/core/lib/iomgr/dynamic_annotations.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",