aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/gprpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/gprpp')
-rw-r--r--src/core/lib/gprpp/manual_constructor.h2
-rw-r--r--src/core/lib/gprpp/memory.h6
-rw-r--r--src/core/lib/gprpp/orphanable.h1
-rw-r--r--src/core/lib/gprpp/ref_counted.h2
-rw-r--r--src/core/lib/gprpp/ref_counted_ptr.h1
-rw-r--r--src/core/lib/gprpp/thd.h135
-rw-r--r--src/core/lib/gprpp/thd_posix.cc209
-rw-r--r--src/core/lib/gprpp/thd_windows.cc162
8 files changed, 514 insertions, 4 deletions
diff --git a/src/core/lib/gprpp/manual_constructor.h b/src/core/lib/gprpp/manual_constructor.h
index a177048605..7f827ca8b7 100644
--- a/src/core/lib/gprpp/manual_constructor.h
+++ b/src/core/lib/gprpp/manual_constructor.h
@@ -156,7 +156,7 @@ class PolymorphicManualConstructor {
static_assert(
manual_ctor_impl::is_one_of<DerivedType, DerivedTypes...>::value,
"DerivedType must be one of the predeclared DerivedTypes");
- GPR_ASSERT(reinterpret_cast<BaseType*>(static_cast<DerivedType*>(p)) == p);
+ GPR_ASSERT(static_cast<BaseType*>(p) == p);
}
typename std::aligned_storage<
diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h
index f84e20eeea..ba2f546675 100644
--- a/src/core/lib/gprpp/memory.h
+++ b/src/core/lib/gprpp/memory.h
@@ -30,12 +30,12 @@
namespace grpc_core {
// The alignment of memory returned by gpr_malloc().
-constexpr size_t kAllignmentForDefaultAllocationInBytes = 8;
+constexpr size_t kAlignmentForDefaultAllocationInBytes = 8;
// Alternative to new, since we cannot use it (for fear of libstdc++)
template <typename T, typename... Args>
inline T* New(Args&&... args) {
- void* p = alignof(T) > kAllignmentForDefaultAllocationInBytes
+ void* p = alignof(T) > kAlignmentForDefaultAllocationInBytes
? gpr_malloc_aligned(sizeof(T), alignof(T))
: gpr_malloc(sizeof(T));
return new (p) T(std::forward<Args>(args)...);
@@ -45,7 +45,7 @@ inline T* New(Args&&... args) {
template <typename T>
inline void Delete(T* p) {
p->~T();
- if (alignof(T) > kAllignmentForDefaultAllocationInBytes) {
+ if (alignof(T) > kAlignmentForDefaultAllocationInBytes) {
gpr_free_aligned(p);
} else {
gpr_free(p);
diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h
index 9e9e7f015f..a5bc8d8efc 100644
--- a/src/core/lib/gprpp/orphanable.h
+++ b/src/core/lib/gprpp/orphanable.h
@@ -24,6 +24,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <cinttypes>
#include <memory>
#include "src/core/lib/debug/trace.h"
diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h
index 02b115a40e..46bfaf7fb8 100644
--- a/src/core/lib/gprpp/ref_counted.h
+++ b/src/core/lib/gprpp/ref_counted.h
@@ -24,6 +24,8 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <cinttypes>
+
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/debug_location.h"
diff --git a/src/core/lib/gprpp/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h
index 72088e76ef..388e2ec410 100644
--- a/src/core/lib/gprpp/ref_counted_ptr.h
+++ b/src/core/lib/gprpp/ref_counted_ptr.h
@@ -33,6 +33,7 @@ template <typename T>
class RefCountedPtr {
public:
RefCountedPtr() {}
+ RefCountedPtr(std::nullptr_t) {}
// If value is non-null, we take ownership of a ref to it.
explicit RefCountedPtr(T* value) { value_ = value; }
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h
new file mode 100644
index 0000000000..05c7ded45f
--- /dev/null
+++ b/src/core/lib/gprpp/thd.h
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_GPRPP_THD_H
+#define GRPC_CORE_LIB_GPRPP_THD_H
+
+/** Internal thread interface. */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd_id.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+namespace internal {
+
+/// Base class for platform-specific thread-state
+class ThreadInternalsInterface {
+ public:
+ virtual ~ThreadInternalsInterface() {}
+ virtual void Start() GRPC_ABSTRACT;
+ virtual void Join() GRPC_ABSTRACT;
+ GRPC_ABSTRACT_BASE_CLASS
+};
+
+} // namespace internal
+
+class Thread {
+ public:
+ /// Default constructor only to allow use in structs that lack constructors
+ /// Does not produce a validly-constructed thread; must later
+ /// use placement new to construct a real thread. Does not init mu_ and cv_
+ Thread() : state_(FAKE), impl_(nullptr) {}
+
+ /// Normal constructor to create a thread with name \a thd_name,
+ /// which will execute a thread based on function \a thd_body
+ /// with argument \a arg once it is started.
+ /// The optional \a success argument indicates whether the thread
+ /// is successfully created.
+ Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success = nullptr);
+
+ /// Move constructor for thread. After this is called, the other thread
+ /// no longer represents a living thread object
+ Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+
+ /// Move assignment operator for thread. After this is called, the other
+ /// thread no longer represents a living thread object. Not allowed if this
+ /// thread actually exists
+ Thread& operator=(Thread&& other) {
+ if (this != &other) {
+ // TODO(vjpai): if we can be sure that all Thread's are actually
+ // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here.
+ // However, as long as threads come in structures that are
+ // allocated via gpr_malloc, this will not be the case, so we cannot
+ // assert it for the time being.
+ state_ = other.state_;
+ impl_ = other.impl_;
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+ return *this;
+ }
+
+ /// The destructor is strictly optional; either the thread never came to life
+ /// and the constructor itself killed it or it has already been joined and
+ /// the Join function kills it. The destructor shouldn't have to do anything.
+ ~Thread() { GPR_ASSERT(impl_ == nullptr); }
+
+ void Start() {
+ if (impl_ != nullptr) {
+ GPR_ASSERT(state_ == ALIVE);
+ state_ = STARTED;
+ impl_->Start();
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
+
+ void Join() {
+ if (impl_ != nullptr) {
+ impl_->Join();
+ grpc_core::Delete(impl_);
+ state_ = DONE;
+ impl_ = nullptr;
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
+
+ static void Init();
+ static bool AwaitAll(gpr_timespec deadline);
+
+ private:
+ Thread(const Thread&) = delete;
+ Thread& operator=(const Thread&) = delete;
+
+ /// The thread states are as follows:
+ /// FAKE -- just a dummy placeholder Thread created by the default constructor
+ /// ALIVE -- an actual thread of control exists associated with this thread
+ /// STARTED -- the thread of control has been started
+ /// DONE -- the thread of control has completed and been joined
+ /// FAILED -- the thread of control never came alive
+ /// MOVED -- contents were moved out and we're no longer tracking them
+ enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
+ ThreadState state_;
+ internal::ThreadInternalsInterface* impl_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_GPRPP_THD_H */
diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc
new file mode 100644
index 0000000000..2f6c2edcae
--- /dev/null
+++ b/src/core/lib/gprpp/thd_posix.cc
@@ -0,0 +1,209 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Posix implementation for gpr threads. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SYNC
+
+#include "src/core/lib/gprpp/thd.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd_id.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/lib/gpr/fork.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+namespace {
+gpr_mu g_mu;
+gpr_cv g_cv;
+int g_thread_count;
+int g_awaiting_threads;
+
+class ThreadInternalsPosix;
+struct thd_arg {
+ ThreadInternalsPosix* thread;
+ void (*body)(void* arg); /* body of a thread */
+ void* arg; /* argument to a thread */
+ const char* name; /* name of thread. Can be nullptr. */
+};
+
+class ThreadInternalsPosix
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
+ void* arg, bool* success)
+ : started_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+ pthread_attr_t attr;
+ /* don't use gpr_malloc as we may cause an infinite recursion with
+ * the profiling code */
+ thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
+ GPR_ASSERT(info != nullptr);
+ info->thread = this;
+ info->body = thd_body;
+ info->arg = arg;
+ info->name = thd_name;
+ inc_thd_count();
+
+ GPR_ASSERT(pthread_attr_init(&attr) == 0);
+ GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
+ 0);
+
+ *success =
+ (pthread_create(&pthread_id_, &attr,
+ [](void* v) -> void* {
+ thd_arg arg = *static_cast<thd_arg*>(v);
+ free(v);
+ if (arg.name != nullptr) {
+#if GPR_APPLE_PTHREAD_NAME
+ /* Apple supports 64 characters, and will
+ * truncate if it's longer. */
+ pthread_setname_np(arg.name);
+#elif GPR_LINUX_PTHREAD_NAME
+ /* Linux supports 16 characters max, and will
+ * error if it's longer. */
+ char buf[16];
+ size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
+ strncpy(buf, arg.name, buf_len);
+ buf[buf_len] = '\0';
+ pthread_setname_np(pthread_self(), buf);
+#endif // GPR_APPLE_PTHREAD_NAME
+ }
+
+ gpr_mu_lock(&arg.thread->mu_);
+ while (!arg.thread->started_) {
+ gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&arg.thread->mu_);
+
+ (*arg.body)(arg.arg);
+ dec_thd_count();
+ return nullptr;
+ },
+ info) == 0);
+
+ GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
+
+ if (!success) {
+ /* don't use gpr_free, as this was allocated using malloc (see above) */
+ free(info);
+ dec_thd_count();
+ }
+ };
+
+ ~ThreadInternalsPosix() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
+ }
+
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ void Join() override { pthread_join(pthread_id_, nullptr); }
+
+ private:
+ /*****************************************
+ * Only used when fork support is enabled
+ */
+
+ static void inc_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count++;
+ gpr_mu_unlock(&g_mu);
+ }
+ }
+
+ static void dec_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count--;
+ if (g_awaiting_threads && g_thread_count == 0) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+ }
+
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ pthread_t pthread_id_;
+};
+
+} // namespace
+
+Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ bool outcome = false;
+ impl_ =
+ grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
+ } else {
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
+ }
+
+ if (success != nullptr) {
+ *success = outcome;
+ }
+}
+
+void Thread::Init() {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
+ g_thread_count = 0;
+ g_awaiting_threads = 0;
+}
+
+bool Thread::AwaitAll(gpr_timespec deadline) {
+ gpr_mu_lock(&g_mu);
+ g_awaiting_threads = 1;
+ int res = 0;
+ while ((g_thread_count > 0) &&
+ (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
+ res = gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
+ g_awaiting_threads = 0;
+ gpr_mu_unlock(&g_mu);
+ return res == 0;
+}
+
+} // namespace grpc_core
+
+// The following is in the external namespace as it is exposed as C89 API
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); }
+
+#endif /* GPR_POSIX_SYNC */
diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc
new file mode 100644
index 0000000000..59ea02f3d2
--- /dev/null
+++ b/src/core/lib/gprpp/thd_windows.cc
@@ -0,0 +1,162 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Windows implementation for gpr threads. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include "src/core/lib/gprpp/thd.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd_id.h>
+#include <string.h>
+
+#include "src/core/lib/gprpp/memory.h"
+
+#if defined(_MSC_VER)
+#define thread_local __declspec(thread)
+#define WIN_LAMBDA
+#elif defined(__GNUC__)
+#define thread_local __thread
+#define WIN_LAMBDA WINAPI
+#else
+#error "Unknown compiler - please file a bug report"
+#endif
+
+namespace {
+class ThreadInternalsWindows;
+struct thd_info {
+ ThreadInternalsWindows* thread;
+ void (*body)(void* arg); /* body of a thread */
+ void* arg; /* argument to a thread */
+ HANDLE join_event; /* the join event */
+};
+
+thread_local struct thd_info* g_thd_info;
+
+class ThreadInternalsWindows
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success)
+ : started_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+
+ HANDLE handle;
+ info_ = (struct thd_info*)gpr_malloc(sizeof(*info_));
+ info_->thread = this;
+ info_->body = thd_body;
+ info_->arg = arg;
+
+ info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
+ if (info_->join_event == nullptr) {
+ gpr_free(info_);
+ *success = false;
+ } else {
+ handle = CreateThread(
+ nullptr, 64 * 1024,
+ [](void* v) WIN_LAMBDA -> DWORD {
+ g_thd_info = static_cast<thd_info*>(v);
+ gpr_mu_lock(&g_thd_info->thread->mu_);
+ while (!g_thd_info->thread->started_) {
+ gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&g_thd_info->thread->mu_);
+ g_thd_info->body(g_thd_info->arg);
+ BOOL ret = SetEvent(g_thd_info->join_event);
+ GPR_ASSERT(ret);
+ return 0;
+ },
+ info_, 0, nullptr);
+ if (handle == nullptr) {
+ destroy_thread();
+ *success = false;
+ } else {
+ CloseHandle(handle);
+ *success = true;
+ }
+ }
+ }
+
+ ~ThreadInternalsWindows() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
+ }
+
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ void Join() override {
+ DWORD ret = WaitForSingleObject(info_->join_event, INFINITE);
+ GPR_ASSERT(ret == WAIT_OBJECT_0);
+ destroy_thread();
+ }
+
+ private:
+ void destroy_thread() {
+ CloseHandle(info_->join_event);
+ gpr_free(info_);
+ }
+
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ thd_info* info_;
+};
+
+} // namespace
+
+namespace grpc_core {
+
+void Thread::Init() {}
+
+bool Thread::AwaitAll(gpr_timespec deadline) {
+ // TODO: Consider adding this if needed
+ return false;
+}
+
+Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ bool outcome = false;
+ impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
+ } else {
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
+ }
+
+ if (success != nullptr) {
+ *success = outcome;
+ }
+}
+
+} // namespace grpc_core
+
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }
+
+#endif /* GPR_WINDOWS */