aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/gprpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/gprpp')
-rw-r--r--src/core/lib/gprpp/fork.cc260
-rw-r--r--src/core/lib/gprpp/fork.h79
-rw-r--r--src/core/lib/gprpp/memory.h11
-rw-r--r--src/core/lib/gprpp/orphanable.h8
-rw-r--r--src/core/lib/gprpp/ref_counted.h8
-rw-r--r--src/core/lib/gprpp/thd.h3
-rw-r--r--src/core/lib/gprpp/thd_posix.cc57
-rw-r--r--src/core/lib/gprpp/thd_windows.cc7
8 files changed, 358 insertions, 75 deletions
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc
new file mode 100644
index 0000000000..f6d9a87d2c
--- /dev/null
+++ b/src/core/lib/gprpp/fork.cc
@@ -0,0 +1,260 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/gprpp/fork.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
+
+/*
+ * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
+ * AROUND VERY SPECIFIC USE CASES.
+ */
+
+namespace grpc_core {
+namespace internal {
+// The exec_ctx_count has 2 modes, blocked and unblocked.
+// When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
+// 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
+
+// When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
+// creation can only be blocked if there is exactly 1 outstanding ExecCtx,
+// meaning that BLOCKED and UNBLOCKED counts partition the integers
+#define UNBLOCKED(n) (n + 2)
+#define BLOCKED(n) (n)
+
+class ExecCtxState {
+ public:
+ ExecCtxState() : fork_complete_(true) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
+ }
+
+ void IncExecCtxCount() {
+ gpr_atm count = gpr_atm_no_barrier_load(&count_);
+ while (true) {
+ if (count <= BLOCKED(1)) {
+ // This only occurs if we are trying to fork. Wait until the fork()
+ // operation completes before allowing new ExecCtxs.
+ gpr_mu_lock(&mu_);
+ if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
+ while (!fork_complete_) {
+ gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ }
+ gpr_mu_unlock(&mu_);
+ } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
+ break;
+ }
+ count = gpr_atm_no_barrier_load(&count_);
+ }
+ }
+
+ void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
+
+ bool BlockExecCtx() {
+ // Assumes there is an active ExecCtx when this function is called
+ if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
+ gpr_mu_lock(&mu_);
+ fork_complete_ = false;
+ gpr_mu_unlock(&mu_);
+ return true;
+ }
+ return false;
+ }
+
+ void AllowExecCtx() {
+ gpr_mu_lock(&mu_);
+ gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
+ fork_complete_ = true;
+ gpr_cv_broadcast(&cv_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ ~ExecCtxState() {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&cv_);
+ }
+
+ private:
+ bool fork_complete_;
+ gpr_mu mu_;
+ gpr_cv cv_;
+ gpr_atm count_;
+};
+
+class ThreadState {
+ public:
+ ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ }
+
+ void IncThreadCount() {
+ gpr_mu_lock(&mu_);
+ count_++;
+ gpr_mu_unlock(&mu_);
+ }
+
+ void DecThreadCount() {
+ gpr_mu_lock(&mu_);
+ count_--;
+ if (awaiting_threads_ && count_ == 0) {
+ threads_done_ = true;
+ gpr_cv_signal(&cv_);
+ }
+ gpr_mu_unlock(&mu_);
+ }
+ void AwaitThreads() {
+ gpr_mu_lock(&mu_);
+ awaiting_threads_ = true;
+ threads_done_ = (count_ == 0);
+ while (!threads_done_) {
+ gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ awaiting_threads_ = true;
+ gpr_mu_unlock(&mu_);
+ }
+
+ ~ThreadState() {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&cv_);
+ }
+
+ private:
+ bool awaiting_threads_;
+ bool threads_done_;
+ gpr_mu mu_;
+ gpr_cv cv_;
+ int count_;
+};
+
+} // namespace
+
+void Fork::GlobalInit() {
+ if (!overrideEnabled_) {
+#ifdef GRPC_ENABLE_FORK_SUPPORT
+ supportEnabled_ = true;
+#else
+ supportEnabled_ = false;
+#endif
+ bool env_var_set = false;
+ char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
+ if (env != nullptr) {
+ static const char* truthy[] = {"yes", "Yes", "YES", "true",
+ "True", "TRUE", "1"};
+ static const char* falsey[] = {"no", "No", "NO", "false",
+ "False", "FALSE", "0"};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ if (0 == strcmp(env, truthy[i])) {
+ supportEnabled_ = true;
+ env_var_set = true;
+ break;
+ }
+ }
+ if (!env_var_set) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
+ if (0 == strcmp(env, falsey[i])) {
+ supportEnabled_ = false;
+ env_var_set = true;
+ break;
+ }
+ }
+ }
+ gpr_free(env);
+ }
+ }
+ if (supportEnabled_) {
+ execCtxState_ = grpc_core::New<internal::ExecCtxState>();
+ threadState_ = grpc_core::New<internal::ThreadState>();
+ }
+}
+
+void Fork::GlobalShutdown() {
+ if (supportEnabled_) {
+ grpc_core::Delete(execCtxState_);
+ grpc_core::Delete(threadState_);
+ }
+}
+
+bool Fork::Enabled() { return supportEnabled_; }
+
+// Testing Only
+void Fork::Enable(bool enable) {
+ overrideEnabled_ = true;
+ supportEnabled_ = enable;
+}
+
+void Fork::IncExecCtxCount() {
+ if (supportEnabled_) {
+ execCtxState_->IncExecCtxCount();
+ }
+}
+
+void Fork::DecExecCtxCount() {
+ if (supportEnabled_) {
+ execCtxState_->DecExecCtxCount();
+ }
+}
+
+bool Fork::BlockExecCtx() {
+ if (supportEnabled_) {
+ return execCtxState_->BlockExecCtx();
+ }
+ return false;
+}
+
+void Fork::AllowExecCtx() {
+ if (supportEnabled_) {
+ execCtxState_->AllowExecCtx();
+ }
+}
+
+void Fork::IncThreadCount() {
+ if (supportEnabled_) {
+ threadState_->IncThreadCount();
+ }
+}
+
+void Fork::DecThreadCount() {
+ if (supportEnabled_) {
+ threadState_->DecThreadCount();
+ }
+}
+void Fork::AwaitThreads() {
+ if (supportEnabled_) {
+ threadState_->AwaitThreads();
+ }
+}
+
+internal::ExecCtxState* Fork::execCtxState_ = nullptr;
+internal::ThreadState* Fork::threadState_ = nullptr;
+bool Fork::supportEnabled_ = false;
+bool Fork::overrideEnabled_ = false;
+
+} // namespace grpc_core
diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h
new file mode 100644
index 0000000000..123e22c4c6
--- /dev/null
+++ b/src/core/lib/gprpp/fork.h
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_GPRPP_FORK_H
+#define GRPC_CORE_LIB_GPRPP_FORK_H
+
+/*
+ * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
+ * AROUND VERY SPECIFIC USE CASES.
+ */
+
+namespace grpc_core {
+
+namespace internal {
+class ExecCtxState;
+class ThreadState;
+} // namespace internal
+
+class Fork {
+ public:
+ static void GlobalInit();
+ static void GlobalShutdown();
+
+ // Returns true if fork suppport is enabled, false otherwise
+ static bool Enabled();
+
+ // Increment the count of active ExecCtxs.
+ // Will block until a pending fork is complete if one is in progress.
+ static void IncExecCtxCount();
+
+ // Decrement the count of active ExecCtxs
+ static void DecExecCtxCount();
+
+ // Check if there is a single active ExecCtx
+ // (the one used to invoke this function). If there are more,
+ // return false. Otherwise, return true and block creation of
+ // more ExecCtx s until AlloWExecCtx() is called
+ //
+ static bool BlockExecCtx();
+ static void AllowExecCtx();
+
+ // Increment the count of active threads.
+ static void IncThreadCount();
+
+ // Decrement the count of active threads.
+ static void DecThreadCount();
+
+ // Await all core threads to be joined.
+ static void AwaitThreads();
+
+ // Test only: overrides environment variables/compile flags
+ // Must be called before grpc_init()
+ static void Enable(bool enable);
+
+ private:
+ static internal::ExecCtxState* execCtxState_;
+ static internal::ThreadState* threadState_;
+ static bool supportEnabled_;
+ static bool overrideEnabled_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_GPRPP_FORK_H */
diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h
index ba2f546675..1354109bf3 100644
--- a/src/core/lib/gprpp/memory.h
+++ b/src/core/lib/gprpp/memory.h
@@ -27,6 +27,17 @@
#include <memory>
#include <utility>
+// Add this to a class that want to use Delete(), but has a private or
+// protected destructor.
+#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE \
+ template <typename T> \
+ friend void Delete(T*);
+// Add this to a class that want to use New(), but has a private or
+// protected constructor.
+#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW \
+ template <typename T, typename... Args> \
+ friend T* New(Args&&...);
+
namespace grpc_core {
// The alignment of memory returned by gpr_malloc().
diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h
index 73a73995c7..d0ec9b6461 100644
--- a/src/core/lib/gprpp/orphanable.h
+++ b/src/core/lib/gprpp/orphanable.h
@@ -83,9 +83,7 @@ class InternallyRefCounted : public Orphanable {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
friend class RefCountedPtr<Child>;
@@ -128,9 +126,7 @@ class InternallyRefCountedWithTracing : public Orphanable {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
friend class RefCountedPtr<Child>;
diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h
index c67e3f315c..ddac5bd475 100644
--- a/src/core/lib/gprpp/ref_counted.h
+++ b/src/core/lib/gprpp/ref_counted.h
@@ -65,9 +65,7 @@ class RefCounted {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
RefCounted() { gpr_ref_init(&refs_, 1); }
@@ -135,9 +133,7 @@ class RefCountedWithTracing {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
RefCountedWithTracing()
: RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {}
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h
index 05c7ded45f..caf0652c1a 100644
--- a/src/core/lib/gprpp/thd.h
+++ b/src/core/lib/gprpp/thd.h
@@ -111,9 +111,6 @@ class Thread {
}
};
- static void Init();
- static bool AwaitAll(gpr_timespec deadline);
-
private:
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc
index 2f6c2edcae..533c07e7d8 100644
--- a/src/core/lib/gprpp/thd_posix.cc
+++ b/src/core/lib/gprpp/thd_posix.cc
@@ -32,17 +32,12 @@
#include <stdlib.h>
#include <string.h>
-#include "src/core/lib/gpr/fork.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
namespace {
-gpr_mu g_mu;
-gpr_cv g_cv;
-int g_thread_count;
-int g_awaiting_threads;
-
class ThreadInternalsPosix;
struct thd_arg {
ThreadInternalsPosix* thread;
@@ -68,7 +63,7 @@ class ThreadInternalsPosix
info->body = thd_body;
info->arg = arg;
info->name = thd_name;
- inc_thd_count();
+ grpc_core::Fork::IncThreadCount();
GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
@@ -103,7 +98,7 @@ class ThreadInternalsPosix
gpr_mu_unlock(&arg.thread->mu_);
(*arg.body)(arg.arg);
- dec_thd_count();
+ grpc_core::Fork::DecThreadCount();
return nullptr;
},
info) == 0);
@@ -113,7 +108,7 @@ class ThreadInternalsPosix
if (!success) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(info);
- dec_thd_count();
+ grpc_core::Fork::DecThreadCount();
}
};
@@ -132,29 +127,6 @@ class ThreadInternalsPosix
void Join() override { pthread_join(pthread_id_, nullptr); }
private:
- /*****************************************
- * Only used when fork support is enabled
- */
-
- static void inc_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count++;
- gpr_mu_unlock(&g_mu);
- }
- }
-
- static void dec_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count--;
- if (g_awaiting_threads && g_thread_count == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
- }
- }
-
gpr_mu mu_;
gpr_cv ready_;
bool started_;
@@ -180,27 +152,6 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
*success = outcome;
}
}
-
-void Thread::Init() {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
- g_thread_count = 0;
- g_awaiting_threads = 0;
-}
-
-bool Thread::AwaitAll(gpr_timespec deadline) {
- gpr_mu_lock(&g_mu);
- g_awaiting_threads = 1;
- int res = 0;
- while ((g_thread_count > 0) &&
- (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
- res = gpr_cv_wait(&g_cv, &g_mu, deadline);
- }
- g_awaiting_threads = 0;
- gpr_mu_unlock(&g_mu);
- return res == 0;
-}
-
} // namespace grpc_core
// The following is in the external namespace as it is exposed as C89 API
diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc
index 59ea02f3d2..71584fd358 100644
--- a/src/core/lib/gprpp/thd_windows.cc
+++ b/src/core/lib/gprpp/thd_windows.cc
@@ -131,13 +131,6 @@ class ThreadInternalsWindows
namespace grpc_core {
-void Thread::Init() {}
-
-bool Thread::AwaitAll(gpr_timespec deadline) {
- // TODO: Consider adding this if needed
- return false;
-}
-
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success) {
bool outcome = false;