diff options
author | Vijay Pai <vpai@google.com> | 2018-03-01 11:36:47 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-03-01 11:47:36 -0800 |
commit | 2fe87b09055cd256cdce038c4c70d92b955c991b (patch) | |
tree | a7652e9285797130e4d4ece7a6f271530be4b14f /src/core/lib/gprpp | |
parent | 8ffa1ae93310646cdf1b15c3a8c2655268d1a47f (diff) |
Move assignment for Thread, make destructor optional, loop cv waits
Diffstat (limited to 'src/core/lib/gprpp')
-rw-r--r-- | src/core/lib/gprpp/thd.h | 95 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_posix.cc | 221 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_windows.cc | 156 |
3 files changed, 285 insertions, 187 deletions
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index f45e78e7f6..2f86893519 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -28,34 +28,105 @@ #include <grpc/support/thd_id.h> #include <grpc/support/time.h> +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/memory.h" + namespace grpc_core { +namespace internal { + +/// Base class for platform-specific thread-state +class ThreadInternalsInterface { + public: + virtual ~ThreadInternalsInterface() {} + virtual void Start() GRPC_ABSTRACT; + virtual void Join() GRPC_ABSTRACT; + GRPC_ABSTRACT_BASE_CLASS +}; +} // namespace internal class Thread { public: /// Default constructor only to allow use in structs that lack constructors /// Does not produce a validly-constructed thread; must later /// use placement new to construct a real thread. Does not init mu_ and cv_ - Thread() : real_(false), alive_(false), started_(false), joined_(false) {} + Thread() : state_(FAKE), impl_(nullptr) {} + /// Normal constructor to create a thread with name \a thd_name, + /// which will execute a thread based on function \a thd_body + /// with argument \a arg once it is started. + /// The optional \a success argument indicates whether the thread + /// is successfully created. Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, bool* success = nullptr); - ~Thread(); - void Start(); - void Join(); + /// Move constructor for thread. After this is called, the other thread + /// no longer represents a living thread object + Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) { + other.state_ = MOVED; + other.impl_ = nullptr; + } + + /// Move assignment operator for thread. After this is called, the other + /// thread no longer represents a living thread object. Not allowed if this + /// thread actually exists + Thread& operator=(Thread&& other) { + if (this != &other) { + // TODO(vjpai): if we can be sure that all Thread's are actually + // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here. + // However, as long as threads come in structures that are + // allocated via gpr_malloc, this will not be the case, so we cannot + // assert it for the time being. + state_ = other.state_; + impl_ = other.impl_; + other.state_ = MOVED; + other.impl_ = nullptr; + } + return *this; + } + + /// The destructor is strictly optional; either the thread never came to life + /// and the constructor itself killed it or it has already been joined and + /// the Join function kills it. The destructor shouldn't have to do anything. + ~Thread() { GPR_ASSERT(impl_ == nullptr); } + + void Start() { + if (impl_ != nullptr) { + GPR_ASSERT(state_ == ALIVE); + state_ = STARTED; + impl_->Start(); + } else { + GPR_ASSERT(state_ == FAILED); + } + }; + void Join() { + if (impl_ != nullptr) { + GPR_ASSERT(state_ == STARTED); + impl_->Join(); + grpc_core::Delete(impl_); + state_ = DONE; + impl_ = nullptr; + } else { + GPR_ASSERT(state_ == FAILED); + } + }; static void Init(); static bool AwaitAll(gpr_timespec deadline); private: - gpr_mu mu_; - gpr_cv ready_; - - gpr_thd_id id_; - bool real_; - bool alive_; - bool started_; - bool joined_; + Thread(const Thread&) = delete; + Thread& operator=(const Thread&) = delete; + + /// The thread states are as follows: + /// FAKE -- just a dummy placeholder Thread created by the default constructor + /// ALIVE -- an actual thread of control exists associated with this thread + /// STARTED -- the thread of control has been started + /// DONE -- the thread of control has completed and been joined + /// FAILED -- the thread of control never came alive + /// MOVED -- contents were moved out and we're no longer tracking them + enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED }; + ThreadState state_; + internal::ThreadInternalsInterface* impl_; }; } // namespace grpc_core diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc index 4ded4d3fd5..28e47f1aa9 100644 --- a/src/core/lib/gprpp/thd_posix.cc +++ b/src/core/lib/gprpp/thd_posix.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/fork.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { namespace { @@ -42,131 +43,142 @@ gpr_cv g_cv; int g_thread_count; int g_awaiting_threads; +class ThreadInternalsPosix; struct thd_arg { - Thread* thread; + ThreadInternalsPosix* thread; void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ const char* name; /* name of thread. Can be nullptr. */ }; -/***************************************** - * Only used when fork support is enabled - */ +class ThreadInternalsPosix + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg), + void* arg, bool* success) + : started_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + pthread_attr_t attr; + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info))); + GPR_ASSERT(info != nullptr); + info->thread = this; + info->body = thd_body; + info->arg = arg; + info->name = thd_name; + inc_thd_count(); + + GPR_ASSERT(pthread_attr_init(&attr) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == + 0); + + *success = + (pthread_create(&pthread_id_, &attr, + [](void* v) -> void* { + thd_arg arg = *static_cast<thd_arg*>(v); + free(v); + if (arg.name != nullptr) { +#if GPR_APPLE_PTHREAD_NAME + /* Apple supports 64 characters, and will + * truncate if it's longer. */ + pthread_setname_np(arg.name); +#elif GPR_LINUX_PTHREAD_NAME + /* Linux supports 16 characters max, and will + * error if it's longer. */ + char buf[16]; + size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; + strncpy(buf, arg.name, buf_len); + buf[buf_len] = '\0'; + pthread_setname_np(pthread_self(), buf); +#endif // GPR_APPLE_PTHREAD_NAME + } + + gpr_mu_lock(&arg.thread->mu_); + while (!arg.thread->started_) { + gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&arg.thread->mu_); + + (*arg.body)(arg.arg); + dec_thd_count(); + return nullptr; + }, + info) == 0); + + GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + + if (!success) { + /* don't use gpr_free, as this was allocated using malloc (see above) */ + free(info); + dec_thd_count(); + } + }; -void inc_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count++; - gpr_mu_unlock(&g_mu); + ~ThreadInternalsPosix() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); } -} -void dec_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count--; - if (g_awaiting_threads && g_thread_count == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); } -} -} // namespace + void Join() override { pthread_join(pthread_id_, nullptr); } -Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success) - : real_(true), alive_(false), started_(false), joined_(false) { - gpr_mu_init(&mu_); - gpr_cv_init(&ready_); - pthread_attr_t attr; - /* don't use gpr_malloc as we may cause an infinite recursion with - * the profiling code */ - thd_arg* a = static_cast<thd_arg*>(malloc(sizeof(*a))); - GPR_ASSERT(a != nullptr); - a->thread = this; - a->body = thd_body; - a->arg = arg; - a->name = thd_name; - inc_thd_count(); - - GPR_ASSERT(pthread_attr_init(&attr) == 0); - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); - - pthread_t p; - alive_ = (pthread_create(&p, &attr, - [](void* v) -> void* { - thd_arg a = *static_cast<thd_arg*>(v); - free(v); - if (a.name != nullptr) { -#if GPR_APPLE_PTHREAD_NAME - /* Apple supports 64 characters, and will - * truncate if it's longer. */ - pthread_setname_np(a.name); -#elif GPR_LINUX_PTHREAD_NAME - /* Linux supports 16 characters max, and will - * error if it's longer. */ - char buf[16]; - size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; - strncpy(buf, a.name, buf_len); - buf[buf_len] = '\0'; - pthread_setname_np(pthread_self(), buf); -#endif // GPR_APPLE_PTHREAD_NAME - } + private: + /***************************************** + * Only used when fork support is enabled + */ - gpr_mu_lock(&a.thread->mu_); - if (!a.thread->started_) { - gpr_cv_wait(&a.thread->ready_, &a.thread->mu_, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - } - gpr_mu_unlock(&a.thread->mu_); - - (*a.body)(a.arg); - dec_thd_count(); - return nullptr; - }, - a) == 0); + static void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } + } - if (success != nullptr) { - *success = alive_; + static void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } } - id_ = gpr_thd_id(p); - GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + gpr_mu mu_; + gpr_cv ready_; + bool started_; + pthread_t pthread_id_; +}; - if (!alive_) { - /* don't use gpr_free, as this was allocated using malloc (see above) */ - free(a); - dec_thd_count(); - } -} +} // namespace -Thread::~Thread() { - if (!alive_) { - // This thread never existed, so nothing to do +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) { + bool outcome; + impl_ = + grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; } else { - GPR_ASSERT(joined_); - } - if (real_) { - gpr_mu_destroy(&mu_); - gpr_cv_destroy(&ready_); + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; } -} -void Thread::Start() { - gpr_mu_lock(&mu_); - if (alive_) { - started_ = true; - gpr_cv_signal(&ready_); - } - gpr_mu_unlock(&mu_); -} - -void Thread::Join() { - if (alive_) { - pthread_join(pthread_t(id_), nullptr); + if (success != nullptr) { + *success = outcome; } - joined_ = true; } void Thread::Init() { @@ -180,7 +192,8 @@ bool Thread::AwaitAll(gpr_timespec deadline) { gpr_mu_lock(&g_mu); g_awaiting_threads = 1; int res = 0; - if (g_thread_count > 0) { + while ((g_thread_count > 0) && + (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) { res = gpr_cv_wait(&g_cv, &g_mu, deadline); } g_awaiting_threads = 0; diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc index efbed30ac6..e13c2f63d1 100644 --- a/src/core/lib/gprpp/thd_windows.cc +++ b/src/core/lib/gprpp/thd_windows.cc @@ -29,6 +29,8 @@ #include <grpc/support/thd_id.h> #include <string.h> +#include "src/core/lib/gprpp/memory.h" + #if defined(_MSC_VER) #define thread_local __declspec(thread) #define WIN_LAMBDA @@ -40,8 +42,9 @@ #endif namespace { +class ThreadInternalsWindows; struct thd_info { - grpc_core::Thread* thread; + ThreadInternalsWindows* thread; void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ HANDLE join_event; /* the join event */ @@ -54,6 +57,77 @@ void destroy_thread(struct thd_info* t) { CloseHandle(t->join_event); gpr_free(t); } + +class ThreadInternalsWindows + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, + bool* success) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + + HANDLE handle; + info_ = (struct thd_info*)gpr_malloc(sizeof(*info_)); + info->thread = this; + info->body = thd_body; + info->arg = arg; + + info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); + if (info->join_event == nullptr) { + gpr_free(info_); + *success = false; + } else { + handle = CreateThread( + nullptr, 64 * 1024, + [](void* v) WIN_LAMBDA -> DWORD { + g_thd_info = static_cast<thd_info*>(v); + gpr_mu_lock(&g_thd_info->thread->mu_); + while (!g_thd_info->thread->started_) { + gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&g_thd_info->thread->mu_); + g_thd_info->body(g_thd_info->arg); + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); + return 0; + }, + info, 0, nullptr); + if (handle == nullptr) { + destroy_thread(info_); + *success_ = false; + } else { + CloseHandle(handle); + *success = true; + } + } + } + + ~ThreadInternalsWindows() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } + + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); + } + + void Join() override { + DWORD ret = WaitForSingleObject(info_->join_event, INFINITE); + GPR_ASSERT(ret == WAIT_OBJECT_0); + destroy_thread(info_); + } + + private: + gpr_mu mu_; + gpr_cv ready_; + bool started_; + thd_info* info_; +}; + } // namespace namespace grpc_core { @@ -66,82 +140,22 @@ bool Thread::AwaitAll(gpr_timespec deadline) { } Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success) - : real_(true), alive_(false), started_(false), joined_(false) { - gpr_mu_init(&mu_); - gpr_cv_init(&ready_); - - HANDLE handle; - struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info)); - info->thread = this; - info->body = thd_body; - info->arg = arg; - - info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); - if (info->join_event == nullptr) { - gpr_free(info); - alive_ = false; + bool* success) { + bool outcome; + impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; } else { - handle = CreateThread(nullptr, 64 * 1024, - [](void* v) WIN_LAMBDA -> DWORD { - g_thd_info = static_cast<thd_info*>(v); - gpr_mu_lock(&g_thd_info->thread->mu_); - if (!g_thd_info->thread->started_) { - gpr_cv_wait(&g_thd_info->thread->ready_, - &g_thd_info->thread->mu_, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - } - gpr_mu_unlock(&g_thd_info->thread->mu_); - g_thd_info->body(g_thd_info->arg); - BOOL ret = SetEvent(g_thd_info->join_event); - GPR_ASSERT(ret); - return 0; - }, - info, 0, nullptr); - if (handle == nullptr) { - destroy_thread(info); - alive_ = false; - } else { - id_ = (gpr_thd_id)info; - CloseHandle(handle); - alive_ = true; - } - } - if (success != nullptr) { - *success = alive_; - } -} - -Thread::~Thread() { - if (!alive_) { - // This thread never existed, so nothing to do - } else { - GPR_ASSERT(joined_); - } - if (real_) { - gpr_mu_destroy(&mu_); - gpr_cv_destroy(&ready_); + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; } -} -void Thread::Start() { - gpr_mu_lock(&mu_); - if (alive_) { - started_ = true; - gpr_cv_signal(&ready_); + if (success != nullptr) { + *success = outcome; } - gpr_mu_unlock(&mu_); } -void Thread::Join() { - if (alive_) { - thd_info* info = (thd_info*)id_; - DWORD ret = WaitForSingleObject(info->join_event, INFINITE); - GPR_ASSERT(ret == WAIT_OBJECT_0); - destroy_thread(info); - } - joined_ = true; -} } // namespace grpc_core gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } |