diff options
Diffstat (limited to 'src/core/lib/gprpp')
-rw-r--r-- | src/core/lib/gprpp/manual_constructor.h | 2 | ||||
-rw-r--r-- | src/core/lib/gprpp/memory.h | 6 | ||||
-rw-r--r-- | src/core/lib/gprpp/orphanable.h | 1 | ||||
-rw-r--r-- | src/core/lib/gprpp/ref_counted.h | 2 | ||||
-rw-r--r-- | src/core/lib/gprpp/ref_counted_ptr.h | 1 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd.h | 135 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_posix.cc | 209 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_windows.cc | 162 |
8 files changed, 514 insertions, 4 deletions
diff --git a/src/core/lib/gprpp/manual_constructor.h b/src/core/lib/gprpp/manual_constructor.h index a177048605..7f827ca8b7 100644 --- a/src/core/lib/gprpp/manual_constructor.h +++ b/src/core/lib/gprpp/manual_constructor.h @@ -156,7 +156,7 @@ class PolymorphicManualConstructor { static_assert( manual_ctor_impl::is_one_of<DerivedType, DerivedTypes...>::value, "DerivedType must be one of the predeclared DerivedTypes"); - GPR_ASSERT(reinterpret_cast<BaseType*>(static_cast<DerivedType*>(p)) == p); + GPR_ASSERT(static_cast<BaseType*>(p) == p); } typename std::aligned_storage< diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h index f84e20eeea..ba2f546675 100644 --- a/src/core/lib/gprpp/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -30,12 +30,12 @@ namespace grpc_core { // The alignment of memory returned by gpr_malloc(). -constexpr size_t kAllignmentForDefaultAllocationInBytes = 8; +constexpr size_t kAlignmentForDefaultAllocationInBytes = 8; // Alternative to new, since we cannot use it (for fear of libstdc++) template <typename T, typename... Args> inline T* New(Args&&... args) { - void* p = alignof(T) > kAllignmentForDefaultAllocationInBytes + void* p = alignof(T) > kAlignmentForDefaultAllocationInBytes ? gpr_malloc_aligned(sizeof(T), alignof(T)) : gpr_malloc(sizeof(T)); return new (p) T(std::forward<Args>(args)...); @@ -45,7 +45,7 @@ inline T* New(Args&&... args) { template <typename T> inline void Delete(T* p) { p->~T(); - if (alignof(T) > kAllignmentForDefaultAllocationInBytes) { + if (alignof(T) > kAlignmentForDefaultAllocationInBytes) { gpr_free_aligned(p); } else { gpr_free(p); diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h index 9e9e7f015f..a5bc8d8efc 100644 --- a/src/core/lib/gprpp/orphanable.h +++ b/src/core/lib/gprpp/orphanable.h @@ -24,6 +24,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <cinttypes> #include <memory> #include "src/core/lib/debug/trace.h" diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h index 02b115a40e..46bfaf7fb8 100644 --- a/src/core/lib/gprpp/ref_counted.h +++ b/src/core/lib/gprpp/ref_counted.h @@ -24,6 +24,8 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <cinttypes> + #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/debug_location.h" diff --git a/src/core/lib/gprpp/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h index 72088e76ef..388e2ec410 100644 --- a/src/core/lib/gprpp/ref_counted_ptr.h +++ b/src/core/lib/gprpp/ref_counted_ptr.h @@ -33,6 +33,7 @@ template <typename T> class RefCountedPtr { public: RefCountedPtr() {} + RefCountedPtr(std::nullptr_t) {} // If value is non-null, we take ownership of a ref to it. explicit RefCountedPtr(T* value) { value_ = value; } diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h new file mode 100644 index 0000000000..05c7ded45f --- /dev/null +++ b/src/core/lib/gprpp/thd.h @@ -0,0 +1,135 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_THD_H +#define GRPC_CORE_LIB_GPRPP_THD_H + +/** Internal thread interface. */ + +#include <grpc/support/port_platform.h> + +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd_id.h> +#include <grpc/support/time.h> + +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/memory.h" + +namespace grpc_core { +namespace internal { + +/// Base class for platform-specific thread-state +class ThreadInternalsInterface { + public: + virtual ~ThreadInternalsInterface() {} + virtual void Start() GRPC_ABSTRACT; + virtual void Join() GRPC_ABSTRACT; + GRPC_ABSTRACT_BASE_CLASS +}; + +} // namespace internal + +class Thread { + public: + /// Default constructor only to allow use in structs that lack constructors + /// Does not produce a validly-constructed thread; must later + /// use placement new to construct a real thread. Does not init mu_ and cv_ + Thread() : state_(FAKE), impl_(nullptr) {} + + /// Normal constructor to create a thread with name \a thd_name, + /// which will execute a thread based on function \a thd_body + /// with argument \a arg once it is started. + /// The optional \a success argument indicates whether the thread + /// is successfully created. + Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success = nullptr); + + /// Move constructor for thread. After this is called, the other thread + /// no longer represents a living thread object + Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) { + other.state_ = MOVED; + other.impl_ = nullptr; + } + + /// Move assignment operator for thread. After this is called, the other + /// thread no longer represents a living thread object. Not allowed if this + /// thread actually exists + Thread& operator=(Thread&& other) { + if (this != &other) { + // TODO(vjpai): if we can be sure that all Thread's are actually + // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here. + // However, as long as threads come in structures that are + // allocated via gpr_malloc, this will not be the case, so we cannot + // assert it for the time being. + state_ = other.state_; + impl_ = other.impl_; + other.state_ = MOVED; + other.impl_ = nullptr; + } + return *this; + } + + /// The destructor is strictly optional; either the thread never came to life + /// and the constructor itself killed it or it has already been joined and + /// the Join function kills it. The destructor shouldn't have to do anything. + ~Thread() { GPR_ASSERT(impl_ == nullptr); } + + void Start() { + if (impl_ != nullptr) { + GPR_ASSERT(state_ == ALIVE); + state_ = STARTED; + impl_->Start(); + } else { + GPR_ASSERT(state_ == FAILED); + } + }; + + void Join() { + if (impl_ != nullptr) { + impl_->Join(); + grpc_core::Delete(impl_); + state_ = DONE; + impl_ = nullptr; + } else { + GPR_ASSERT(state_ == FAILED); + } + }; + + static void Init(); + static bool AwaitAll(gpr_timespec deadline); + + private: + Thread(const Thread&) = delete; + Thread& operator=(const Thread&) = delete; + + /// The thread states are as follows: + /// FAKE -- just a dummy placeholder Thread created by the default constructor + /// ALIVE -- an actual thread of control exists associated with this thread + /// STARTED -- the thread of control has been started + /// DONE -- the thread of control has completed and been joined + /// FAILED -- the thread of control never came alive + /// MOVED -- contents were moved out and we're no longer tracking them + enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED }; + ThreadState state_; + internal::ThreadInternalsInterface* impl_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_GPRPP_THD_H */ diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc new file mode 100644 index 0000000000..2f6c2edcae --- /dev/null +++ b/src/core/lib/gprpp/thd_posix.cc @@ -0,0 +1,209 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Posix implementation for gpr threads. */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SYNC + +#include "src/core/lib/gprpp/thd.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd_id.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/lib/gpr/fork.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" + +namespace grpc_core { +namespace { +gpr_mu g_mu; +gpr_cv g_cv; +int g_thread_count; +int g_awaiting_threads; + +class ThreadInternalsPosix; +struct thd_arg { + ThreadInternalsPosix* thread; + void (*body)(void* arg); /* body of a thread */ + void* arg; /* argument to a thread */ + const char* name; /* name of thread. Can be nullptr. */ +}; + +class ThreadInternalsPosix + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg), + void* arg, bool* success) + : started_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + pthread_attr_t attr; + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info))); + GPR_ASSERT(info != nullptr); + info->thread = this; + info->body = thd_body; + info->arg = arg; + info->name = thd_name; + inc_thd_count(); + + GPR_ASSERT(pthread_attr_init(&attr) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == + 0); + + *success = + (pthread_create(&pthread_id_, &attr, + [](void* v) -> void* { + thd_arg arg = *static_cast<thd_arg*>(v); + free(v); + if (arg.name != nullptr) { +#if GPR_APPLE_PTHREAD_NAME + /* Apple supports 64 characters, and will + * truncate if it's longer. */ + pthread_setname_np(arg.name); +#elif GPR_LINUX_PTHREAD_NAME + /* Linux supports 16 characters max, and will + * error if it's longer. */ + char buf[16]; + size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; + strncpy(buf, arg.name, buf_len); + buf[buf_len] = '\0'; + pthread_setname_np(pthread_self(), buf); +#endif // GPR_APPLE_PTHREAD_NAME + } + + gpr_mu_lock(&arg.thread->mu_); + while (!arg.thread->started_) { + gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&arg.thread->mu_); + + (*arg.body)(arg.arg); + dec_thd_count(); + return nullptr; + }, + info) == 0); + + GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + + if (!success) { + /* don't use gpr_free, as this was allocated using malloc (see above) */ + free(info); + dec_thd_count(); + } + }; + + ~ThreadInternalsPosix() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } + + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); + } + + void Join() override { pthread_join(pthread_id_, nullptr); } + + private: + /***************************************** + * Only used when fork support is enabled + */ + + static void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } + } + + static void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } + } + + gpr_mu mu_; + gpr_cv ready_; + bool started_; + pthread_t pthread_id_; +}; + +} // namespace + +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) { + bool outcome = false; + impl_ = + grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; + } else { + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; + } + + if (success != nullptr) { + *success = outcome; + } +} + +void Thread::Init() { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + g_thread_count = 0; + g_awaiting_threads = 0; +} + +bool Thread::AwaitAll(gpr_timespec deadline) { + gpr_mu_lock(&g_mu); + g_awaiting_threads = 1; + int res = 0; + while ((g_thread_count > 0) && + (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) { + res = gpr_cv_wait(&g_cv, &g_mu, deadline); + } + g_awaiting_threads = 0; + gpr_mu_unlock(&g_mu); + return res == 0; +} + +} // namespace grpc_core + +// The following is in the external namespace as it is exposed as C89 API +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } + +#endif /* GPR_POSIX_SYNC */ diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc new file mode 100644 index 0000000000..59ea02f3d2 --- /dev/null +++ b/src/core/lib/gprpp/thd_windows.cc @@ -0,0 +1,162 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Windows implementation for gpr threads. */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINDOWS + +#include "src/core/lib/gprpp/thd.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd_id.h> +#include <string.h> + +#include "src/core/lib/gprpp/memory.h" + +#if defined(_MSC_VER) +#define thread_local __declspec(thread) +#define WIN_LAMBDA +#elif defined(__GNUC__) +#define thread_local __thread +#define WIN_LAMBDA WINAPI +#else +#error "Unknown compiler - please file a bug report" +#endif + +namespace { +class ThreadInternalsWindows; +struct thd_info { + ThreadInternalsWindows* thread; + void (*body)(void* arg); /* body of a thread */ + void* arg; /* argument to a thread */ + HANDLE join_event; /* the join event */ +}; + +thread_local struct thd_info* g_thd_info; + +class ThreadInternalsWindows + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success) + : started_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + + HANDLE handle; + info_ = (struct thd_info*)gpr_malloc(sizeof(*info_)); + info_->thread = this; + info_->body = thd_body; + info_->arg = arg; + + info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); + if (info_->join_event == nullptr) { + gpr_free(info_); + *success = false; + } else { + handle = CreateThread( + nullptr, 64 * 1024, + [](void* v) WIN_LAMBDA -> DWORD { + g_thd_info = static_cast<thd_info*>(v); + gpr_mu_lock(&g_thd_info->thread->mu_); + while (!g_thd_info->thread->started_) { + gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&g_thd_info->thread->mu_); + g_thd_info->body(g_thd_info->arg); + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); + return 0; + }, + info_, 0, nullptr); + if (handle == nullptr) { + destroy_thread(); + *success = false; + } else { + CloseHandle(handle); + *success = true; + } + } + } + + ~ThreadInternalsWindows() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } + + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); + } + + void Join() override { + DWORD ret = WaitForSingleObject(info_->join_event, INFINITE); + GPR_ASSERT(ret == WAIT_OBJECT_0); + destroy_thread(); + } + + private: + void destroy_thread() { + CloseHandle(info_->join_event); + gpr_free(info_); + } + + gpr_mu mu_; + gpr_cv ready_; + bool started_; + thd_info* info_; +}; + +} // namespace + +namespace grpc_core { + +void Thread::Init() {} + +bool Thread::AwaitAll(gpr_timespec deadline) { + // TODO: Consider adding this if needed + return false; +} + +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) { + bool outcome = false; + impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; + } else { + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; + } + + if (success != nullptr) { + *success = outcome; + } +} + +} // namespace grpc_core + +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } + +#endif /* GPR_WINDOWS */ |