diff options
Diffstat (limited to 'src/core/lib/gprpp')
-rw-r--r-- | src/core/lib/gprpp/fork.cc | 260 | ||||
-rw-r--r-- | src/core/lib/gprpp/fork.h | 79 | ||||
-rw-r--r-- | src/core/lib/gprpp/memory.h | 11 | ||||
-rw-r--r-- | src/core/lib/gprpp/orphanable.h | 8 | ||||
-rw-r--r-- | src/core/lib/gprpp/ref_counted.h | 8 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd.h | 3 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_posix.cc | 57 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_windows.cc | 7 |
8 files changed, 358 insertions, 75 deletions
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc new file mode 100644 index 0000000000..f6d9a87d2c --- /dev/null +++ b/src/core/lib/gprpp/fork.cc @@ -0,0 +1,260 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/gprpp/fork.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +namespace grpc_core { +namespace internal { +// The exec_ctx_count has 2 modes, blocked and unblocked. +// When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates +// 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs... + +// When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx +// creation can only be blocked if there is exactly 1 outstanding ExecCtx, +// meaning that BLOCKED and UNBLOCKED counts partition the integers +#define UNBLOCKED(n) (n + 2) +#define BLOCKED(n) (n) + +class ExecCtxState { + public: + ExecCtxState() : fork_complete_(true) { + gpr_mu_init(&mu_); + gpr_cv_init(&cv_); + gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); + } + + void IncExecCtxCount() { + gpr_atm count = gpr_atm_no_barrier_load(&count_); + while (true) { + if (count <= BLOCKED(1)) { + // This only occurs if we are trying to fork. Wait until the fork() + // operation completes before allowing new ExecCtxs. + gpr_mu_lock(&mu_); + if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) { + while (!fork_complete_) { + gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + } + gpr_mu_unlock(&mu_); + } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) { + break; + } + count = gpr_atm_no_barrier_load(&count_); + } + } + + void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); } + + bool BlockExecCtx() { + // Assumes there is an active ExecCtx when this function is called + if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) { + gpr_mu_lock(&mu_); + fork_complete_ = false; + gpr_mu_unlock(&mu_); + return true; + } + return false; + } + + void AllowExecCtx() { + gpr_mu_lock(&mu_); + gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); + fork_complete_ = true; + gpr_cv_broadcast(&cv_); + gpr_mu_unlock(&mu_); + } + + ~ExecCtxState() { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&cv_); + } + + private: + bool fork_complete_; + gpr_mu mu_; + gpr_cv cv_; + gpr_atm count_; +}; + +class ThreadState { + public: + ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) { + gpr_mu_init(&mu_); + gpr_cv_init(&cv_); + } + + void IncThreadCount() { + gpr_mu_lock(&mu_); + count_++; + gpr_mu_unlock(&mu_); + } + + void DecThreadCount() { + gpr_mu_lock(&mu_); + count_--; + if (awaiting_threads_ && count_ == 0) { + threads_done_ = true; + gpr_cv_signal(&cv_); + } + gpr_mu_unlock(&mu_); + } + void AwaitThreads() { + gpr_mu_lock(&mu_); + awaiting_threads_ = true; + threads_done_ = (count_ == 0); + while (!threads_done_) { + gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + awaiting_threads_ = true; + gpr_mu_unlock(&mu_); + } + + ~ThreadState() { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&cv_); + } + + private: + bool awaiting_threads_; + bool threads_done_; + gpr_mu mu_; + gpr_cv cv_; + int count_; +}; + +} // namespace + +void Fork::GlobalInit() { + if (!overrideEnabled_) { +#ifdef GRPC_ENABLE_FORK_SUPPORT + supportEnabled_ = true; +#else + supportEnabled_ = false; +#endif + bool env_var_set = false; + char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); + if (env != nullptr) { + static const char* truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + static const char* falsey[] = {"no", "No", "NO", "false", + "False", "FALSE", "0"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) { + supportEnabled_ = true; + env_var_set = true; + break; + } + } + if (!env_var_set) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { + if (0 == strcmp(env, falsey[i])) { + supportEnabled_ = false; + env_var_set = true; + break; + } + } + } + gpr_free(env); + } + } + if (supportEnabled_) { + execCtxState_ = grpc_core::New<internal::ExecCtxState>(); + threadState_ = grpc_core::New<internal::ThreadState>(); + } +} + +void Fork::GlobalShutdown() { + if (supportEnabled_) { + grpc_core::Delete(execCtxState_); + grpc_core::Delete(threadState_); + } +} + +bool Fork::Enabled() { return supportEnabled_; } + +// Testing Only +void Fork::Enable(bool enable) { + overrideEnabled_ = true; + supportEnabled_ = enable; +} + +void Fork::IncExecCtxCount() { + if (supportEnabled_) { + execCtxState_->IncExecCtxCount(); + } +} + +void Fork::DecExecCtxCount() { + if (supportEnabled_) { + execCtxState_->DecExecCtxCount(); + } +} + +bool Fork::BlockExecCtx() { + if (supportEnabled_) { + return execCtxState_->BlockExecCtx(); + } + return false; +} + +void Fork::AllowExecCtx() { + if (supportEnabled_) { + execCtxState_->AllowExecCtx(); + } +} + +void Fork::IncThreadCount() { + if (supportEnabled_) { + threadState_->IncThreadCount(); + } +} + +void Fork::DecThreadCount() { + if (supportEnabled_) { + threadState_->DecThreadCount(); + } +} +void Fork::AwaitThreads() { + if (supportEnabled_) { + threadState_->AwaitThreads(); + } +} + +internal::ExecCtxState* Fork::execCtxState_ = nullptr; +internal::ThreadState* Fork::threadState_ = nullptr; +bool Fork::supportEnabled_ = false; +bool Fork::overrideEnabled_ = false; + +} // namespace grpc_core diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h new file mode 100644 index 0000000000..123e22c4c6 --- /dev/null +++ b/src/core/lib/gprpp/fork.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_FORK_H +#define GRPC_CORE_LIB_GPRPP_FORK_H + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +namespace grpc_core { + +namespace internal { +class ExecCtxState; +class ThreadState; +} // namespace internal + +class Fork { + public: + static void GlobalInit(); + static void GlobalShutdown(); + + // Returns true if fork suppport is enabled, false otherwise + static bool Enabled(); + + // Increment the count of active ExecCtxs. + // Will block until a pending fork is complete if one is in progress. + static void IncExecCtxCount(); + + // Decrement the count of active ExecCtxs + static void DecExecCtxCount(); + + // Check if there is a single active ExecCtx + // (the one used to invoke this function). If there are more, + // return false. Otherwise, return true and block creation of + // more ExecCtx s until AlloWExecCtx() is called + // + static bool BlockExecCtx(); + static void AllowExecCtx(); + + // Increment the count of active threads. + static void IncThreadCount(); + + // Decrement the count of active threads. + static void DecThreadCount(); + + // Await all core threads to be joined. + static void AwaitThreads(); + + // Test only: overrides environment variables/compile flags + // Must be called before grpc_init() + static void Enable(bool enable); + + private: + static internal::ExecCtxState* execCtxState_; + static internal::ThreadState* threadState_; + static bool supportEnabled_; + static bool overrideEnabled_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_GPRPP_FORK_H */ diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h index ba2f546675..1354109bf3 100644 --- a/src/core/lib/gprpp/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -27,6 +27,17 @@ #include <memory> #include <utility> +// Add this to a class that want to use Delete(), but has a private or +// protected destructor. +#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE \ + template <typename T> \ + friend void Delete(T*); +// Add this to a class that want to use New(), but has a private or +// protected constructor. +#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW \ + template <typename T, typename... Args> \ + friend T* New(Args&&...); + namespace grpc_core { // The alignment of memory returned by gpr_malloc(). diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h index 73a73995c7..d0ec9b6461 100644 --- a/src/core/lib/gprpp/orphanable.h +++ b/src/core/lib/gprpp/orphanable.h @@ -83,9 +83,7 @@ class InternallyRefCounted : public Orphanable { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). friend class RefCountedPtr<Child>; @@ -128,9 +126,7 @@ class InternallyRefCountedWithTracing : public Orphanable { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). friend class RefCountedPtr<Child>; diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h index c67e3f315c..ddac5bd475 100644 --- a/src/core/lib/gprpp/ref_counted.h +++ b/src/core/lib/gprpp/ref_counted.h @@ -65,9 +65,7 @@ class RefCounted { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE RefCounted() { gpr_ref_init(&refs_, 1); } @@ -135,9 +133,7 @@ class RefCountedWithTracing { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE RefCountedWithTracing() : RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {} diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index 05c7ded45f..caf0652c1a 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -111,9 +111,6 @@ class Thread { } }; - static void Init(); - static bool AwaitAll(gpr_timespec deadline); - private: Thread(const Thread&) = delete; Thread& operator=(const Thread&) = delete; diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc index 2f6c2edcae..533c07e7d8 100644 --- a/src/core/lib/gprpp/thd_posix.cc +++ b/src/core/lib/gprpp/thd_posix.cc @@ -32,17 +32,12 @@ #include <stdlib.h> #include <string.h> -#include "src/core/lib/gpr/fork.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/memory.h" namespace grpc_core { namespace { -gpr_mu g_mu; -gpr_cv g_cv; -int g_thread_count; -int g_awaiting_threads; - class ThreadInternalsPosix; struct thd_arg { ThreadInternalsPosix* thread; @@ -68,7 +63,7 @@ class ThreadInternalsPosix info->body = thd_body; info->arg = arg; info->name = thd_name; - inc_thd_count(); + grpc_core::Fork::IncThreadCount(); GPR_ASSERT(pthread_attr_init(&attr) == 0); GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == @@ -103,7 +98,7 @@ class ThreadInternalsPosix gpr_mu_unlock(&arg.thread->mu_); (*arg.body)(arg.arg); - dec_thd_count(); + grpc_core::Fork::DecThreadCount(); return nullptr; }, info) == 0); @@ -113,7 +108,7 @@ class ThreadInternalsPosix if (!success) { /* don't use gpr_free, as this was allocated using malloc (see above) */ free(info); - dec_thd_count(); + grpc_core::Fork::DecThreadCount(); } }; @@ -132,29 +127,6 @@ class ThreadInternalsPosix void Join() override { pthread_join(pthread_id_, nullptr); } private: - /***************************************** - * Only used when fork support is enabled - */ - - static void inc_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count++; - gpr_mu_unlock(&g_mu); - } - } - - static void dec_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count--; - if (g_awaiting_threads && g_thread_count == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); - } - } - gpr_mu mu_; gpr_cv ready_; bool started_; @@ -180,27 +152,6 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, *success = outcome; } } - -void Thread::Init() { - gpr_mu_init(&g_mu); - gpr_cv_init(&g_cv); - g_thread_count = 0; - g_awaiting_threads = 0; -} - -bool Thread::AwaitAll(gpr_timespec deadline) { - gpr_mu_lock(&g_mu); - g_awaiting_threads = 1; - int res = 0; - while ((g_thread_count > 0) && - (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) { - res = gpr_cv_wait(&g_cv, &g_mu, deadline); - } - g_awaiting_threads = 0; - gpr_mu_unlock(&g_mu); - return res == 0; -} - } // namespace grpc_core // The following is in the external namespace as it is exposed as C89 API diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc index 59ea02f3d2..71584fd358 100644 --- a/src/core/lib/gprpp/thd_windows.cc +++ b/src/core/lib/gprpp/thd_windows.cc @@ -131,13 +131,6 @@ class ThreadInternalsWindows namespace grpc_core { -void Thread::Init() {} - -bool Thread::AwaitAll(gpr_timespec deadline) { - // TODO: Consider adding this if needed - return false; -} - Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, bool* success) { bool outcome = false; |