aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/gpr/thd.cc51
-rw-r--r--src/core/lib/gpr/thd.h72
-rw-r--r--src/core/lib/gpr/thd_posix.cc154
-rw-r--r--src/core/lib/gpr/thd_windows.cc107
-rw-r--r--src/core/lib/gprpp/thd.h135
-rw-r--r--src/core/lib/gprpp/thd_posix.cc209
-rw-r--r--src/core/lib/gprpp/thd_windows.cc162
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc66
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc2
-rw-r--r--src/core/lib/iomgr/executor.cc22
-rw-r--r--src/core/lib/iomgr/fork_posix.cc4
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc2
-rw-r--r--src/core/lib/iomgr/iomgr.cc2
-rw-r--r--src/core/lib/iomgr/pollset_windows.cc2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc2
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.cc2
-rw-r--r--src/core/lib/iomgr/timer_manager.cc29
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.cc2
-rw-r--r--src/core/lib/profiling/basic_timers.cc13
-rw-r--r--src/core/lib/surface/init.cc4
-rw-r--r--src/core/tsi/alts_transport_security.cc2
-rw-r--r--src/core/tsi/alts_transport_security.h4
-rw-r--r--src/cpp/client/channel_cc.cc2
-rw-r--r--src/cpp/server/dynamic_thread_pool.cc16
-rw-r--r--src/cpp/server/dynamic_thread_pool.h4
-rw-r--r--src/cpp/thread_manager/thread_manager.cc15
-rw-r--r--src/cpp/thread_manager/thread_manager.h10
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py5
28 files changed, 634 insertions, 466 deletions
diff --git a/src/core/lib/gpr/thd.cc b/src/core/lib/gpr/thd.cc
deleted file mode 100644
index b5341c41b4..0000000000
--- a/src/core/lib/gpr/thd.cc
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/* Platform-independent features for gpr threads. */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/gpr/thd.h"
-
-#include <string.h>
-
-enum { GPR_THD_JOINABLE = 1 };
-
-gpr_thd_options gpr_thd_options_default(void) {
- gpr_thd_options options;
- memset(&options, 0, sizeof(options));
- return options;
-}
-
-void gpr_thd_options_set_detached(gpr_thd_options* options) {
- options->flags &= ~GPR_THD_JOINABLE;
-}
-
-void gpr_thd_options_set_joinable(gpr_thd_options* options) {
- options->flags |= GPR_THD_JOINABLE;
-}
-
-int gpr_thd_options_is_detached(const gpr_thd_options* options) {
- if (!options) return 1;
- return (options->flags & GPR_THD_JOINABLE) == 0;
-}
-
-int gpr_thd_options_is_joinable(const gpr_thd_options* options) {
- if (!options) return 0;
- return (options->flags & GPR_THD_JOINABLE) == GPR_THD_JOINABLE;
-}
diff --git a/src/core/lib/gpr/thd.h b/src/core/lib/gpr/thd.h
deleted file mode 100644
index 920b336708..0000000000
--- a/src/core/lib/gpr/thd.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_GPR_THD_H
-#define GRPC_CORE_LIB_GPR_THD_H
-/** Internal thread interface for GPR.
-
- Types
- gpr_thd_options options used when creating a thread
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <grpc/support/thd_id.h>
-#include <grpc/support/time.h>
-
-/** Thread creation options. */
-typedef struct {
- int flags; /** Opaque field. Get and set with accessors below. */
-} gpr_thd_options;
-
-/** Create a new thread running (*thd_body)(arg) and place its thread identifier
- in *t, and return true. If there are insufficient resources, return false.
- thd_name is the name of the thread for identification purposes on platforms
- that support thread naming.
- If options==NULL, default options are used.
- The thread is immediately runnable, and exits when (*thd_body)() returns. */
-int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
- void (*thd_body)(void* arg), void* arg,
- const gpr_thd_options* options);
-
-/** Return a gpr_thd_options struct with all fields set to defaults. */
-gpr_thd_options gpr_thd_options_default(void);
-
-/** Set the thread to become detached on startup - this is the default. */
-void gpr_thd_options_set_detached(gpr_thd_options* options);
-
-/** Set the thread to become joinable - mutually exclusive with detached. */
-void gpr_thd_options_set_joinable(gpr_thd_options* options);
-
-/** Returns non-zero if the option detached is set. */
-int gpr_thd_options_is_detached(const gpr_thd_options* options);
-
-/** Returns non-zero if the option joinable is set. */
-int gpr_thd_options_is_joinable(const gpr_thd_options* options);
-
-/** Blocks until the specified thread properly terminates.
- Calling this on a detached thread has unpredictable results. */
-void gpr_thd_join(gpr_thd_id t);
-
-/* Internal interfaces between modules within the gpr support library. */
-void gpr_thd_init();
-
-/* Wait for all outstanding threads to finish, up to deadline */
-int gpr_await_threads(gpr_timespec deadline);
-
-#endif /* GRPC_CORE_LIB_GPR_THD_H */
diff --git a/src/core/lib/gpr/thd_posix.cc b/src/core/lib/gpr/thd_posix.cc
deleted file mode 100644
index fcd174bfba..0000000000
--- a/src/core/lib/gpr/thd_posix.cc
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/* Posix implementation for gpr threads. */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SYNC
-
-#include "src/core/lib/gpr/thd.h"
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/thd_id.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "src/core/lib/gpr/fork.h"
-#include "src/core/lib/gpr/useful.h"
-
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static int g_thread_count;
-static int g_awaiting_threads;
-
-struct thd_arg {
- void (*body)(void* arg); /* body of a thread */
- void* arg; /* argument to a thread */
- const char* name; /* name of thread. Can be nullptr. */
-};
-
-static void inc_thd_count();
-static void dec_thd_count();
-
-/* Body of every thread started via gpr_thd_new. */
-static void* thread_body(void* v) {
- struct thd_arg a = *static_cast<struct thd_arg*>(v);
- free(v);
- if (a.name != nullptr) {
-#if GPR_APPLE_PTHREAD_NAME
- /* Apple supports 64 characters, and will truncate if it's longer. */
- pthread_setname_np(a.name);
-#elif GPR_LINUX_PTHREAD_NAME
- /* Linux supports 16 characters max, and will error if it's longer. */
- char buf[16];
- size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
- strncpy(buf, a.name, buf_len);
- buf[buf_len] = '\0';
- pthread_setname_np(pthread_self(), buf);
-#endif // GPR_APPLE_PTHREAD_NAME
- }
- (*a.body)(a.arg);
- dec_thd_count();
- return nullptr;
-}
-
-int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
- void (*thd_body)(void* arg), void* arg,
- const gpr_thd_options* options) {
- int thread_started;
- pthread_attr_t attr;
- pthread_t p;
- /* don't use gpr_malloc as we may cause an infinite recursion with
- * the profiling code */
- struct thd_arg* a = static_cast<struct thd_arg*>(malloc(sizeof(*a)));
- GPR_ASSERT(a != nullptr);
- a->body = thd_body;
- a->arg = arg;
- a->name = thd_name;
- inc_thd_count();
-
- GPR_ASSERT(pthread_attr_init(&attr) == 0);
- if (gpr_thd_options_is_detached(options)) {
- GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
- 0);
- } else {
- GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
- 0);
- }
- thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
- GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
- if (!thread_started) {
- /* don't use gpr_free, as this was allocated using malloc (see above) */
- free(a);
- dec_thd_count();
- }
- *t = (gpr_thd_id)p;
- return thread_started;
-}
-
-gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); }
-
-void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, nullptr); }
-
-/*****************************************
- * Only used when fork support is enabled
- */
-
-static void inc_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count++;
- gpr_mu_unlock(&g_mu);
- }
-}
-
-static void dec_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count--;
- if (g_awaiting_threads && g_thread_count == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
- }
-}
-
-void gpr_thd_init() {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
- g_thread_count = 0;
- g_awaiting_threads = 0;
-}
-
-int gpr_await_threads(gpr_timespec deadline) {
- gpr_mu_lock(&g_mu);
- g_awaiting_threads = 1;
- int res = 0;
- if (g_thread_count > 0) {
- res = gpr_cv_wait(&g_cv, &g_mu, deadline);
- }
- g_awaiting_threads = 0;
- gpr_mu_unlock(&g_mu);
- return res == 0;
-}
-
-#endif /* GPR_POSIX_SYNC */
diff --git a/src/core/lib/gpr/thd_windows.cc b/src/core/lib/gpr/thd_windows.cc
deleted file mode 100644
index b467bd2662..0000000000
--- a/src/core/lib/gpr/thd_windows.cc
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/* Windows implementation for gpr threads. */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_WINDOWS
-
-#include "src/core/lib/gpr/thd.h"
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd_id.h>
-#include <string.h>
-
-#if defined(_MSC_VER)
-#define thread_local __declspec(thread)
-#elif defined(__GNUC__)
-#define thread_local __thread
-#else
-#error "Unknown compiler - please file a bug report"
-#endif
-
-struct thd_info {
- void (*body)(void* arg); /* body of a thread */
- void* arg; /* argument to a thread */
- HANDLE join_event; /* if joinable, the join event */
- int joinable; /* true if not detached */
-};
-
-static thread_local struct thd_info* g_thd_info;
-
-/* Destroys a thread info */
-static void destroy_thread(struct thd_info* t) {
- if (t->joinable) CloseHandle(t->join_event);
- gpr_free(t);
-}
-
-void gpr_thd_init(void) {}
-
-/* Body of every thread started via gpr_thd_new. */
-static DWORD WINAPI thread_body(void* v) {
- g_thd_info = (struct thd_info*)v;
- g_thd_info->body(g_thd_info->arg);
- if (g_thd_info->joinable) {
- BOOL ret = SetEvent(g_thd_info->join_event);
- GPR_ASSERT(ret);
- } else {
- destroy_thread(g_thd_info);
- }
- return 0;
-}
-
-int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
- void (*thd_body)(void* arg), void* arg,
- const gpr_thd_options* options) {
- HANDLE handle;
- struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info));
- info->body = thd_body;
- info->arg = arg;
- *t = 0;
- if (gpr_thd_options_is_joinable(options)) {
- info->joinable = 1;
- info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (info->join_event == NULL) {
- gpr_free(info);
- return 0;
- }
- } else {
- info->joinable = 0;
- }
- handle = CreateThread(NULL, 64 * 1024, thread_body, info, 0, NULL);
- if (handle == NULL) {
- destroy_thread(info);
- } else {
- *t = (gpr_thd_id)info;
- CloseHandle(handle);
- }
- return handle != NULL;
-}
-
-gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }
-
-void gpr_thd_join(gpr_thd_id t) {
- struct thd_info* info = (struct thd_info*)t;
- DWORD ret = WaitForSingleObject(info->join_event, INFINITE);
- GPR_ASSERT(ret == WAIT_OBJECT_0);
- destroy_thread(info);
-}
-
-#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h
new file mode 100644
index 0000000000..05c7ded45f
--- /dev/null
+++ b/src/core/lib/gprpp/thd.h
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_GPRPP_THD_H
+#define GRPC_CORE_LIB_GPRPP_THD_H
+
+/** Internal thread interface. */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd_id.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+namespace internal {
+
+/// Base class for platform-specific thread-state
+class ThreadInternalsInterface {
+ public:
+ virtual ~ThreadInternalsInterface() {}
+ virtual void Start() GRPC_ABSTRACT;
+ virtual void Join() GRPC_ABSTRACT;
+ GRPC_ABSTRACT_BASE_CLASS
+};
+
+} // namespace internal
+
+class Thread {
+ public:
+ /// Default constructor only to allow use in structs that lack constructors
+ /// Does not produce a validly-constructed thread; must later
+ /// use placement new to construct a real thread. Does not init mu_ and cv_
+ Thread() : state_(FAKE), impl_(nullptr) {}
+
+ /// Normal constructor to create a thread with name \a thd_name,
+ /// which will execute a thread based on function \a thd_body
+ /// with argument \a arg once it is started.
+ /// The optional \a success argument indicates whether the thread
+ /// is successfully created.
+ Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success = nullptr);
+
+ /// Move constructor for thread. After this is called, the other thread
+ /// no longer represents a living thread object
+ Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+
+ /// Move assignment operator for thread. After this is called, the other
+ /// thread no longer represents a living thread object. Not allowed if this
+ /// thread actually exists
+ Thread& operator=(Thread&& other) {
+ if (this != &other) {
+ // TODO(vjpai): if we can be sure that all Thread's are actually
+ // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here.
+ // However, as long as threads come in structures that are
+ // allocated via gpr_malloc, this will not be the case, so we cannot
+ // assert it for the time being.
+ state_ = other.state_;
+ impl_ = other.impl_;
+ other.state_ = MOVED;
+ other.impl_ = nullptr;
+ }
+ return *this;
+ }
+
+ /// The destructor is strictly optional; either the thread never came to life
+ /// and the constructor itself killed it or it has already been joined and
+ /// the Join function kills it. The destructor shouldn't have to do anything.
+ ~Thread() { GPR_ASSERT(impl_ == nullptr); }
+
+ void Start() {
+ if (impl_ != nullptr) {
+ GPR_ASSERT(state_ == ALIVE);
+ state_ = STARTED;
+ impl_->Start();
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
+
+ void Join() {
+ if (impl_ != nullptr) {
+ impl_->Join();
+ grpc_core::Delete(impl_);
+ state_ = DONE;
+ impl_ = nullptr;
+ } else {
+ GPR_ASSERT(state_ == FAILED);
+ }
+ };
+
+ static void Init();
+ static bool AwaitAll(gpr_timespec deadline);
+
+ private:
+ Thread(const Thread&) = delete;
+ Thread& operator=(const Thread&) = delete;
+
+ /// The thread states are as follows:
+ /// FAKE -- just a dummy placeholder Thread created by the default constructor
+ /// ALIVE -- an actual thread of control exists associated with this thread
+ /// STARTED -- the thread of control has been started
+ /// DONE -- the thread of control has completed and been joined
+ /// FAILED -- the thread of control never came alive
+ /// MOVED -- contents were moved out and we're no longer tracking them
+ enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
+ ThreadState state_;
+ internal::ThreadInternalsInterface* impl_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_GPRPP_THD_H */
diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc
new file mode 100644
index 0000000000..2f6c2edcae
--- /dev/null
+++ b/src/core/lib/gprpp/thd_posix.cc
@@ -0,0 +1,209 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Posix implementation for gpr threads. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SYNC
+
+#include "src/core/lib/gprpp/thd.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd_id.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/lib/gpr/fork.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+namespace {
+gpr_mu g_mu;
+gpr_cv g_cv;
+int g_thread_count;
+int g_awaiting_threads;
+
+class ThreadInternalsPosix;
+struct thd_arg {
+ ThreadInternalsPosix* thread;
+ void (*body)(void* arg); /* body of a thread */
+ void* arg; /* argument to a thread */
+ const char* name; /* name of thread. Can be nullptr. */
+};
+
+class ThreadInternalsPosix
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
+ void* arg, bool* success)
+ : started_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+ pthread_attr_t attr;
+ /* don't use gpr_malloc as we may cause an infinite recursion with
+ * the profiling code */
+ thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
+ GPR_ASSERT(info != nullptr);
+ info->thread = this;
+ info->body = thd_body;
+ info->arg = arg;
+ info->name = thd_name;
+ inc_thd_count();
+
+ GPR_ASSERT(pthread_attr_init(&attr) == 0);
+ GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
+ 0);
+
+ *success =
+ (pthread_create(&pthread_id_, &attr,
+ [](void* v) -> void* {
+ thd_arg arg = *static_cast<thd_arg*>(v);
+ free(v);
+ if (arg.name != nullptr) {
+#if GPR_APPLE_PTHREAD_NAME
+ /* Apple supports 64 characters, and will
+ * truncate if it's longer. */
+ pthread_setname_np(arg.name);
+#elif GPR_LINUX_PTHREAD_NAME
+ /* Linux supports 16 characters max, and will
+ * error if it's longer. */
+ char buf[16];
+ size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
+ strncpy(buf, arg.name, buf_len);
+ buf[buf_len] = '\0';
+ pthread_setname_np(pthread_self(), buf);
+#endif // GPR_APPLE_PTHREAD_NAME
+ }
+
+ gpr_mu_lock(&arg.thread->mu_);
+ while (!arg.thread->started_) {
+ gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&arg.thread->mu_);
+
+ (*arg.body)(arg.arg);
+ dec_thd_count();
+ return nullptr;
+ },
+ info) == 0);
+
+ GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
+
+ if (!success) {
+ /* don't use gpr_free, as this was allocated using malloc (see above) */
+ free(info);
+ dec_thd_count();
+ }
+ };
+
+ ~ThreadInternalsPosix() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
+ }
+
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ void Join() override { pthread_join(pthread_id_, nullptr); }
+
+ private:
+ /*****************************************
+ * Only used when fork support is enabled
+ */
+
+ static void inc_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count++;
+ gpr_mu_unlock(&g_mu);
+ }
+ }
+
+ static void dec_thd_count() {
+ if (grpc_fork_support_enabled()) {
+ gpr_mu_lock(&g_mu);
+ g_thread_count--;
+ if (g_awaiting_threads && g_thread_count == 0) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+ }
+
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ pthread_t pthread_id_;
+};
+
+} // namespace
+
+Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ bool outcome = false;
+ impl_ =
+ grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
+ } else {
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
+ }
+
+ if (success != nullptr) {
+ *success = outcome;
+ }
+}
+
+void Thread::Init() {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
+ g_thread_count = 0;
+ g_awaiting_threads = 0;
+}
+
+bool Thread::AwaitAll(gpr_timespec deadline) {
+ gpr_mu_lock(&g_mu);
+ g_awaiting_threads = 1;
+ int res = 0;
+ while ((g_thread_count > 0) &&
+ (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
+ res = gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
+ g_awaiting_threads = 0;
+ gpr_mu_unlock(&g_mu);
+ return res == 0;
+}
+
+} // namespace grpc_core
+
+// The following is in the external namespace as it is exposed as C89 API
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); }
+
+#endif /* GPR_POSIX_SYNC */
diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc
new file mode 100644
index 0000000000..59ea02f3d2
--- /dev/null
+++ b/src/core/lib/gprpp/thd_windows.cc
@@ -0,0 +1,162 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Windows implementation for gpr threads. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include "src/core/lib/gprpp/thd.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd_id.h>
+#include <string.h>
+
+#include "src/core/lib/gprpp/memory.h"
+
+#if defined(_MSC_VER)
+#define thread_local __declspec(thread)
+#define WIN_LAMBDA
+#elif defined(__GNUC__)
+#define thread_local __thread
+#define WIN_LAMBDA WINAPI
+#else
+#error "Unknown compiler - please file a bug report"
+#endif
+
+namespace {
+class ThreadInternalsWindows;
+struct thd_info {
+ ThreadInternalsWindows* thread;
+ void (*body)(void* arg); /* body of a thread */
+ void* arg; /* argument to a thread */
+ HANDLE join_event; /* the join event */
+};
+
+thread_local struct thd_info* g_thd_info;
+
+class ThreadInternalsWindows
+ : public grpc_core::internal::ThreadInternalsInterface {
+ public:
+ ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success)
+ : started_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&ready_);
+
+ HANDLE handle;
+ info_ = (struct thd_info*)gpr_malloc(sizeof(*info_));
+ info_->thread = this;
+ info_->body = thd_body;
+ info_->arg = arg;
+
+ info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
+ if (info_->join_event == nullptr) {
+ gpr_free(info_);
+ *success = false;
+ } else {
+ handle = CreateThread(
+ nullptr, 64 * 1024,
+ [](void* v) WIN_LAMBDA -> DWORD {
+ g_thd_info = static_cast<thd_info*>(v);
+ gpr_mu_lock(&g_thd_info->thread->mu_);
+ while (!g_thd_info->thread->started_) {
+ gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ gpr_mu_unlock(&g_thd_info->thread->mu_);
+ g_thd_info->body(g_thd_info->arg);
+ BOOL ret = SetEvent(g_thd_info->join_event);
+ GPR_ASSERT(ret);
+ return 0;
+ },
+ info_, 0, nullptr);
+ if (handle == nullptr) {
+ destroy_thread();
+ *success = false;
+ } else {
+ CloseHandle(handle);
+ *success = true;
+ }
+ }
+ }
+
+ ~ThreadInternalsWindows() override {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&ready_);
+ }
+
+ void Start() override {
+ gpr_mu_lock(&mu_);
+ started_ = true;
+ gpr_cv_signal(&ready_);
+ gpr_mu_unlock(&mu_);
+ }
+
+ void Join() override {
+ DWORD ret = WaitForSingleObject(info_->join_event, INFINITE);
+ GPR_ASSERT(ret == WAIT_OBJECT_0);
+ destroy_thread();
+ }
+
+ private:
+ void destroy_thread() {
+ CloseHandle(info_->join_event);
+ gpr_free(info_);
+ }
+
+ gpr_mu mu_;
+ gpr_cv ready_;
+ bool started_;
+ thd_info* info_;
+};
+
+} // namespace
+
+namespace grpc_core {
+
+void Thread::Init() {}
+
+bool Thread::AwaitAll(gpr_timespec deadline) {
+ // TODO: Consider adding this if needed
+ return false;
+}
+
+Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
+ bool* success) {
+ bool outcome = false;
+ impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
+ if (outcome) {
+ state_ = ALIVE;
+ } else {
+ state_ = FAILED;
+ grpc_core::Delete(impl_);
+ impl_ = nullptr;
+ }
+
+ if (success != nullptr) {
+ *success = outcome;
+ }
+}
+
+} // namespace grpc_core
+
+gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }
+
+#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index e979ff7eb5..6120f9f44b 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -38,9 +38,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/murmur_hash.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
@@ -255,8 +255,13 @@ typedef struct poll_result {
} poll_result;
typedef struct poll_args {
+ grpc_core::Thread poller_thd;
gpr_cv trigger;
int trigger_set;
+ bool harvestable;
+ gpr_cv harvest;
+ bool joinable;
+ gpr_cv join;
struct pollfd* fds;
nfds_t nfds;
poll_result* result;
@@ -266,15 +271,17 @@ typedef struct poll_args {
// This is a 2-tiered cache, we mantain a hash table
// of active poll calls, so we can wait on the result
-// of that call. We also maintain a freelist of inactive
-// poll threads.
+// of that call. We also maintain freelists of inactive
+// poll args and of dead poller threads.
typedef struct poll_hash_table {
poll_args* free_pollers;
poll_args** active_pollers;
+ poll_args* dead_pollers;
unsigned int size;
unsigned int count;
} poll_hash_table;
+// TODO(kpayson64): Eliminate use of global non-POD variables
poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
@@ -1301,6 +1308,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
static void run_poll(void* args);
static void cache_poller_locked(poll_args* args);
+static void cache_harvest_locked();
static void cache_insert_locked(poll_args* args) {
uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
@@ -1363,6 +1371,10 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
poll_args* pargs =
static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args)));
gpr_cv_init(&pargs->trigger);
+ gpr_cv_init(&pargs->harvest);
+ gpr_cv_init(&pargs->join);
+ pargs->harvestable = false;
+ pargs->joinable = false;
pargs->fds = fds;
pargs->nfds = count;
pargs->next = nullptr;
@@ -1370,11 +1382,9 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
pargs->trigger_set = 0;
init_result(pargs);
cache_poller_locked(pargs);
- gpr_thd_id t_id;
- gpr_thd_options opt = gpr_thd_options_default();
gpr_ref(&g_cvfds.pollcount);
- gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt));
+ pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
+ pargs->poller_thd.Start();
return pargs;
}
@@ -1439,7 +1449,33 @@ static void cache_destroy_locked(poll_args* args) {
poll_cache.free_pollers = args->next;
}
- gpr_free(args);
+ // Now move this args to the dead poller list for later join
+ if (poll_cache.dead_pollers != nullptr) {
+ poll_cache.dead_pollers->prev = args;
+ }
+ args->prev = nullptr;
+ args->next = poll_cache.dead_pollers;
+ poll_cache.dead_pollers = args;
+}
+
+static void cache_harvest_locked() {
+ while (poll_cache.dead_pollers) {
+ poll_args* args = poll_cache.dead_pollers;
+ poll_cache.dead_pollers = poll_cache.dead_pollers->next;
+ // Keep the list consistent in case new dead pollers get added when we
+ // release the lock below to wait on joining
+ if (poll_cache.dead_pollers) {
+ poll_cache.dead_pollers->prev = nullptr;
+ }
+ args->harvestable = true;
+ gpr_cv_signal(&args->harvest);
+ while (!args->joinable) {
+ gpr_cv_wait(&args->join, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ args->poller_thd.Join();
+ gpr_free(args);
+ }
}
static void decref_poll_result(poll_result* res) {
@@ -1471,6 +1507,7 @@ static void run_poll(void* args) {
poll_result* result = pargs->result;
int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
gpr_mu_lock(&g_cvfds.mu);
+ cache_harvest_locked();
if (retval != 0) {
result->completed = 1;
result->retval = retval;
@@ -1490,6 +1527,7 @@ static void run_poll(void* args) {
deadline = gpr_time_add(deadline, thread_grace);
pargs->trigger_set = 0;
gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
if (!pargs->trigger_set) {
cache_destroy_locked(pargs);
break;
@@ -1498,10 +1536,15 @@ static void run_poll(void* args) {
gpr_mu_unlock(&g_cvfds.mu);
}
- // We still have the lock here
if (gpr_unref(&g_cvfds.pollcount)) {
gpr_cv_signal(&g_cvfds.shutdown_cv);
}
+ while (!pargs->harvestable) {
+ gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ pargs->joinable = true;
+ gpr_cv_signal(&pargs->join);
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1514,6 +1557,7 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
nfds_t nsockfds = 0;
poll_result* result = nullptr;
gpr_mu_lock(&g_cvfds.mu);
+ cache_harvest_locked();
pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node)));
pollcv->next = nullptr;
gpr_cv pollcv_cv;
@@ -1577,12 +1621,14 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
pargs->trigger_set = 1;
gpr_cv_signal(&pargs->trigger);
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
res = result->retval;
errno = result->err;
result->watchcount--;
remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
}
idx = 0;
@@ -1639,6 +1685,7 @@ static void global_cv_fd_table_init() {
for (unsigned int i = 0; i < poll_cache.size; i++) {
poll_cache.active_pollers[i] = nullptr;
}
+ poll_cache.dead_pollers = nullptr;
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1657,6 +1704,7 @@ static void global_cv_fd_table_shutdown() {
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
+ cache_harvest_locked();
gpr_free(poll_cache.active_pollers);
gpr_mu_unlock(&g_cvfds.mu);
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 132fe87870..2f544b20ab 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -23,7 +23,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/profiling/timers.h"
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index e7f412a562..b017db53f8 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -29,9 +29,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/spinlock.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#define MAX_DEPTH 2
@@ -43,7 +43,7 @@ typedef struct {
size_t depth;
bool shutdown;
bool queued_long_job;
- gpr_thd_id id;
+ grpc_core::Thread thd;
} thread_state;
static thread_state* g_thread_state;
@@ -101,13 +101,13 @@ void grpc_executor_set_threading(bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_init(&g_thread_state[i].mu);
gpr_cv_init(&g_thread_state[i].cv);
+ g_thread_state[i].thd = grpc_core::Thread();
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
}
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread,
- &g_thread_state[0], &opt);
+ g_thread_state[0].thd =
+ grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]);
+ g_thread_state[0].thd.Start();
} else {
if (cur_threads == 0) return;
for (size_t i = 0; i < g_max_threads; i++) {
@@ -121,7 +121,7 @@ void grpc_executor_set_threading(bool threading) {
gpr_spinlock_lock(&g_adding_thread_lock);
gpr_spinlock_unlock(&g_adding_thread_lock);
for (gpr_atm i = 0; i < g_cur_threads; i++) {
- gpr_thd_join(g_thread_state[i].id);
+ g_thread_state[i].thd.Join();
}
gpr_atm_no_barrier_store(&g_cur_threads, 0);
for (size_t i = 0; i < g_max_threads; i++) {
@@ -264,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
- executor_thread, &g_thread_state[cur_thread_count], &opt);
+ g_thread_state[cur_thread_count].thd =
+ grpc_core::Thread("grpc_executor", executor_thread,
+ &g_thread_state[cur_thread_count]);
+ g_thread_state[cur_thread_count].thd.Start();
}
gpr_spinlock_unlock(&g_adding_thread_lock);
}
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index d32fbc4588..f8645ab157 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -29,7 +29,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/fork.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
@@ -53,7 +53,7 @@ void grpc_prefork() {
grpc_timer_manager_set_threading(false);
grpc_executor_set_threading(false);
grpc_core::ExecCtx::Get()->Flush();
- if (!gpr_await_threads(
+ if (!grpc_core::Thread::AwaitAll(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)))) {
gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!");
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index 5285734719..ce77231036 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -30,7 +30,7 @@
#include <grpc/support/log_windows.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/socket_windows.h"
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index 70a80e1998..3c2b83a549 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -31,8 +31,8 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc
index 62ab760875..c1b83ddc14 100644
--- a/src/core/lib/iomgr/pollset_windows.cc
+++ b/src/core/lib/iomgr/pollset_windows.cc
@@ -24,7 +24,7 @@
#include <grpc/support/log.h>
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset.h"
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index 9307f19bcf..2f68dbe214 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -35,8 +35,8 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc
index 4e1dcaabaf..7a62c88720 100644
--- a/src/core/lib/iomgr/resolve_address_windows.cc
+++ b/src/core/lib/iomgr/resolve_address_windows.cc
@@ -37,7 +37,7 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index 0210e70015..94f288af27 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -18,21 +18,20 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/timer_manager.h"
+#include <inttypes.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <inttypes.h>
-
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_manager.h"
-typedef struct completed_thread {
- gpr_thd_id t;
- struct completed_thread* next;
-} completed_thread;
+struct completed_thread {
+ grpc_core::Thread thd;
+ completed_thread* next;
+};
extern grpc_core::TraceFlag grpc_timer_check_trace;
@@ -68,7 +67,7 @@ static void gc_completed_threads(void) {
g_completed_threads = nullptr;
gpr_mu_unlock(&g_mu);
while (to_gc != nullptr) {
- gpr_thd_join(to_gc->t);
+ to_gc->thd.Join();
completed_thread* next = to_gc->next;
gpr_free(to_gc);
to_gc = next;
@@ -85,18 +84,10 @@ static void start_timer_thread_and_unlock(void) {
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
completed_thread* ct =
static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
- // The call to gpr_thd_new() has to be under the same lock used by
- // gc_completed_threads(), particularly due to ct->t, which is written here
- // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct
- // to leak through g_completed_threads and be freed in gc_completed_threads()
- // before "&ct->t" is written to, causing a use-after-free.
- gpr_mu_lock(&g_mu);
- gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt);
- gpr_mu_unlock(&g_mu);
+ ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
+ ct->thd.Start();
}
void grpc_timer_manager_tick() {
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc
index ee322105ae..74faa6379e 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.cc
+++ b/src/core/lib/iomgr/wakeup_fd_cv.cc
@@ -32,8 +32,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#define MAX_TABLE_RESIZE 256
diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc
index ca6705a6b3..43384fd0ca 100644
--- a/src/core/lib/profiling/basic_timers.cc
+++ b/src/core/lib/profiling/basic_timers.cc
@@ -30,7 +30,7 @@
#include <string.h>
#include "src/core/lib/gpr/env.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type;
@@ -68,7 +68,7 @@ static pthread_cond_t g_cv;
static gpr_timer_log_list g_in_progress_logs;
static gpr_timer_log_list g_done_logs;
static int g_shutdown;
-static gpr_thd_id g_writing_thread;
+static grpc_core::Thread* g_writing_thread;
static __thread int g_thread_id;
static int g_next_thread_id;
static int g_writing_enabled = 1;
@@ -182,7 +182,8 @@ static void finish_writing(void) {
g_shutdown = 1;
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
- gpr_thd_join(g_writing_thread);
+ g_writing_thread->Join();
+ grpc_core::Delete(g_writing_thread);
gpr_log(GPR_INFO, "flushing logs");
@@ -201,10 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) {
}
static void init_output() {
- gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
- GPR_ASSERT(gpr_thd_new(&g_writing_thread, "timer_output_thread",
- writing_thread, NULL, &options));
+ g_writing_thread = grpc_core::New<grpc_core::Thread>("timer_output_thread",
+ writing_thread, nullptr);
atexit(finish_writing);
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 7bc24a5049..ac9f9e6066 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -32,7 +32,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/fork.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -123,7 +123,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
- gpr_thd_init();
+ grpc_core::Thread::Init();
grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc
index b45b4e0736..2fd408103b 100644
--- a/src/core/tsi/alts_transport_security.cc
+++ b/src/core/tsi/alts_transport_security.cc
@@ -56,7 +56,7 @@ void grpc_tsi_alts_shutdown() {
grpc_tsi_alts_wait_for_cq_drain();
grpc_completion_queue_destroy(g_alts_resource.cq);
grpc_channel_destroy(g_alts_resource.channel);
- gpr_thd_join(g_alts_resource.thread_id);
+ g_alts_resource.thread.Join();
}
gpr_cv_destroy(&g_alts_resource.cv);
gpr_mu_destroy(&g_alts_resource.mu);
diff --git a/src/core/tsi/alts_transport_security.h b/src/core/tsi/alts_transport_security.h
index 3ca064992b..d6b8e11137 100644
--- a/src/core/tsi/alts_transport_security.h
+++ b/src/core/tsi/alts_transport_security.h
@@ -24,10 +24,10 @@
#include <grpc/grpc.h>
#include <grpc/support/sync.h>
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
typedef struct alts_shared_resource {
- gpr_thd_id thread_id;
+ grpc_core::Thread thread;
grpc_channel* channel;
grpc_completion_queue* cq;
gpr_mu mu;
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index cba5984f4b..867f31f025 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -42,7 +42,7 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/profiling/timers.h"
namespace grpc {
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
index 81c78fe739..fe887486d1 100644
--- a/src/cpp/server/dynamic_thread_pool.cc
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -19,20 +19,24 @@
#include "src/cpp/server/dynamic_thread_pool.h"
#include <mutex>
-#include <thread>
#include <grpc/support/log.h>
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
: pool_(pool),
- thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
- this)) {}
-DynamicThreadPool::DynamicThread::~DynamicThread() {
- thd_->join();
- thd_.reset();
+ thd_("dynamic thread pool thread",
+ [](void* th) {
+ reinterpret_cast<DynamicThreadPool::DynamicThread*>(th)
+ ->ThreadFunc();
+ },
+ this) {
+ thd_.Start();
}
+DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->ThreadFunc();
diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h
index 880a03d0f0..5df8cf2b04 100644
--- a/src/cpp/server/dynamic_thread_pool.h
+++ b/src/cpp/server/dynamic_thread_pool.h
@@ -24,10 +24,10 @@
#include <memory>
#include <mutex>
#include <queue>
-#include <thread>
#include <grpcpp/support/config.h>
+#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@@ -47,7 +47,7 @@ class DynamicThreadPool final : public ThreadPoolInterface {
private:
DynamicThreadPool* pool_;
- std::unique_ptr<std::thread> thd_;
+ grpc_core::Thread thd_;
void ThreadFunc();
};
std::mutex mu_;
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 23264f1b5b..21cc9bbb31 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -20,18 +20,24 @@
#include <climits>
#include <mutex>
-#include <thread>
#include <grpc/support/log.h>
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
: thd_mgr_(thd_mgr) {
// Make thread creation exclusive with respect to its join happening in
// ~WorkerThread().
- std::lock_guard<std::mutex> lock(wt_mu_);
- thd_ = std::thread(&ThreadManager::WorkerThread::Run, this);
+ thd_ = grpc_core::Thread(
+ "sync server thread",
+ [](void* th) {
+ reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run();
+ },
+ this);
+ thd_.Start();
}
void ThreadManager::WorkerThread::Run() {
@@ -41,8 +47,7 @@ void ThreadManager::WorkerThread::Run() {
ThreadManager::WorkerThread::~WorkerThread() {
// Don't join until the thread is fully constructed.
- std::lock_guard<std::mutex> lock(wt_mu_);
- thd_.join();
+ thd_.Join();
}
ThreadManager::ThreadManager(int min_pollers, int max_pollers)
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 1113031695..5a40f2de47 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -23,10 +23,11 @@
#include <list>
#include <memory>
#include <mutex>
-#include <thread>
#include <grpcpp/support/config.h>
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
class ThreadManager {
@@ -84,8 +85,8 @@ class ThreadManager {
virtual void Wait();
private:
- // Helper wrapper class around std::thread. This takes a ThreadManager object
- // and starts a new std::thread to calls the Run() function.
+ // Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object
+ // and starts a new grpc_core::Thread to calls the Run() function.
//
// The Run() function calls ThreadManager::MainWorkLoop() function and once
// that completes, it marks the WorkerThread completed by calling
@@ -101,8 +102,7 @@ class ThreadManager {
void Run();
ThreadManager* const thd_mgr_;
- std::mutex wt_mu_;
- std::thread thd_;
+ grpc_core::Thread thd_;
};
// The main funtion in ThreadManager
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 8ba52ab3a0..75156793f1 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -41,9 +41,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/gpr/sync.cc',
'src/core/lib/gpr/sync_posix.cc',
'src/core/lib/gpr/sync_windows.cc',
- 'src/core/lib/gpr/thd.cc',
- 'src/core/lib/gpr/thd_posix.cc',
- 'src/core/lib/gpr/thd_windows.cc',
'src/core/lib/gpr/time.cc',
'src/core/lib/gpr/time_posix.cc',
'src/core/lib/gpr/time_precise.cc',
@@ -53,6 +50,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/gpr/tmpfile_posix.cc',
'src/core/lib/gpr/tmpfile_windows.cc',
'src/core/lib/gpr/wrap_memcpy.cc',
+ 'src/core/lib/gprpp/thd_posix.cc',
+ 'src/core/lib/gprpp/thd_windows.cc',
'src/core/lib/profiling/basic_timers.cc',
'src/core/lib/profiling/stap_timers.cc',
'src/core/lib/surface/init.cc',