diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/gpr/thd.cc | 49 | ||||
-rw-r--r-- | src/core/lib/gpr/thd.h | 71 | ||||
-rw-r--r-- | src/core/lib/gpr/thd_posix.cc | 154 | ||||
-rw-r--r-- | src/core/lib/gpr/thd_windows.cc | 107 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd.h | 63 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_posix.cc | 196 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_windows.cc | 149 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 57 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 24 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/iocp_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.cc | 34 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.cc | 2 | ||||
-rw-r--r-- | src/core/lib/profiling/basic_timers.cc | 13 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 4 | ||||
-rw-r--r-- | src/core/tsi/alts_transport_security.cc | 3 | ||||
-rw-r--r-- | src/core/tsi/alts_transport_security.h | 4 |
22 files changed, 504 insertions, 442 deletions
diff --git a/src/core/lib/gpr/thd.cc b/src/core/lib/gpr/thd.cc deleted file mode 100644 index 11391418b1..0000000000 --- a/src/core/lib/gpr/thd.cc +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* Platform-independent features for gpr threads. */ - -#include "src/core/lib/gpr/thd.h" - -#include <string.h> - -enum { GPR_THD_JOINABLE = 1 }; - -gpr_thd_options gpr_thd_options_default(void) { - gpr_thd_options options; - memset(&options, 0, sizeof(options)); - return options; -} - -void gpr_thd_options_set_detached(gpr_thd_options* options) { - options->flags &= ~GPR_THD_JOINABLE; -} - -void gpr_thd_options_set_joinable(gpr_thd_options* options) { - options->flags |= GPR_THD_JOINABLE; -} - -int gpr_thd_options_is_detached(const gpr_thd_options* options) { - if (!options) return 1; - return (options->flags & GPR_THD_JOINABLE) == 0; -} - -int gpr_thd_options_is_joinable(const gpr_thd_options* options) { - if (!options) return 0; - return (options->flags & GPR_THD_JOINABLE) == GPR_THD_JOINABLE; -} diff --git a/src/core/lib/gpr/thd.h b/src/core/lib/gpr/thd.h deleted file mode 100644 index 58ce0d0088..0000000000 --- a/src/core/lib/gpr/thd.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_GPR_THD_H -#define GRPC_CORE_LIB_GPR_THD_H -/** Internal thread interface for GPR. - - Types - gpr_thd_options options used when creating a thread - */ - -#include <grpc/support/port_platform.h> -#include <grpc/support/thd_id.h> -#include <grpc/support/time.h> - -/** Thread creation options. */ -typedef struct { - int flags; /** Opaque field. Get and set with accessors below. */ -} gpr_thd_options; - -/** Create a new thread running (*thd_body)(arg) and place its thread identifier - in *t, and return true. If there are insufficient resources, return false. - thd_name is the name of the thread for identification purposes on platforms - that support thread naming. - If options==NULL, default options are used. - The thread is immediately runnable, and exits when (*thd_body)() returns. */ -int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options); - -/** Return a gpr_thd_options struct with all fields set to defaults. */ -gpr_thd_options gpr_thd_options_default(void); - -/** Set the thread to become detached on startup - this is the default. */ -void gpr_thd_options_set_detached(gpr_thd_options* options); - -/** Set the thread to become joinable - mutually exclusive with detached. */ -void gpr_thd_options_set_joinable(gpr_thd_options* options); - -/** Returns non-zero if the option detached is set. */ -int gpr_thd_options_is_detached(const gpr_thd_options* options); - -/** Returns non-zero if the option joinable is set. */ -int gpr_thd_options_is_joinable(const gpr_thd_options* options); - -/** Blocks until the specified thread properly terminates. - Calling this on a detached thread has unpredictable results. */ -void gpr_thd_join(gpr_thd_id t); - -/* Internal interfaces between modules within the gpr support library. */ -void gpr_thd_init(); - -/* Wait for all outstanding threads to finish, up to deadline */ -int gpr_await_threads(gpr_timespec deadline); - -#endif /* GRPC_CORE_LIB_GPR_THD_H */ diff --git a/src/core/lib/gpr/thd_posix.cc b/src/core/lib/gpr/thd_posix.cc deleted file mode 100644 index fcd174bfba..0000000000 --- a/src/core/lib/gpr/thd_posix.cc +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* Posix implementation for gpr threads. */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_SYNC - -#include "src/core/lib/gpr/thd.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd_id.h> -#include <pthread.h> -#include <stdlib.h> -#include <string.h> - -#include "src/core/lib/gpr/fork.h" -#include "src/core/lib/gpr/useful.h" - -static gpr_mu g_mu; -static gpr_cv g_cv; -static int g_thread_count; -static int g_awaiting_threads; - -struct thd_arg { - void (*body)(void* arg); /* body of a thread */ - void* arg; /* argument to a thread */ - const char* name; /* name of thread. Can be nullptr. */ -}; - -static void inc_thd_count(); -static void dec_thd_count(); - -/* Body of every thread started via gpr_thd_new. */ -static void* thread_body(void* v) { - struct thd_arg a = *static_cast<struct thd_arg*>(v); - free(v); - if (a.name != nullptr) { -#if GPR_APPLE_PTHREAD_NAME - /* Apple supports 64 characters, and will truncate if it's longer. */ - pthread_setname_np(a.name); -#elif GPR_LINUX_PTHREAD_NAME - /* Linux supports 16 characters max, and will error if it's longer. */ - char buf[16]; - size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; - strncpy(buf, a.name, buf_len); - buf[buf_len] = '\0'; - pthread_setname_np(pthread_self(), buf); -#endif // GPR_APPLE_PTHREAD_NAME - } - (*a.body)(a.arg); - dec_thd_count(); - return nullptr; -} - -int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options) { - int thread_started; - pthread_attr_t attr; - pthread_t p; - /* don't use gpr_malloc as we may cause an infinite recursion with - * the profiling code */ - struct thd_arg* a = static_cast<struct thd_arg*>(malloc(sizeof(*a))); - GPR_ASSERT(a != nullptr); - a->body = thd_body; - a->arg = arg; - a->name = thd_name; - inc_thd_count(); - - GPR_ASSERT(pthread_attr_init(&attr) == 0); - if (gpr_thd_options_is_detached(options)) { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == - 0); - } else { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == - 0); - } - thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); - GPR_ASSERT(pthread_attr_destroy(&attr) == 0); - if (!thread_started) { - /* don't use gpr_free, as this was allocated using malloc (see above) */ - free(a); - dec_thd_count(); - } - *t = (gpr_thd_id)p; - return thread_started; -} - -gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } - -void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, nullptr); } - -/***************************************** - * Only used when fork support is enabled - */ - -static void inc_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count++; - gpr_mu_unlock(&g_mu); - } -} - -static void dec_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count--; - if (g_awaiting_threads && g_thread_count == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); - } -} - -void gpr_thd_init() { - gpr_mu_init(&g_mu); - gpr_cv_init(&g_cv); - g_thread_count = 0; - g_awaiting_threads = 0; -} - -int gpr_await_threads(gpr_timespec deadline) { - gpr_mu_lock(&g_mu); - g_awaiting_threads = 1; - int res = 0; - if (g_thread_count > 0) { - res = gpr_cv_wait(&g_cv, &g_mu, deadline); - } - g_awaiting_threads = 0; - gpr_mu_unlock(&g_mu); - return res == 0; -} - -#endif /* GPR_POSIX_SYNC */ diff --git a/src/core/lib/gpr/thd_windows.cc b/src/core/lib/gpr/thd_windows.cc deleted file mode 100644 index b467bd2662..0000000000 --- a/src/core/lib/gpr/thd_windows.cc +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* Windows implementation for gpr threads. */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_WINDOWS - -#include "src/core/lib/gpr/thd.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/thd_id.h> -#include <string.h> - -#if defined(_MSC_VER) -#define thread_local __declspec(thread) -#elif defined(__GNUC__) -#define thread_local __thread -#else -#error "Unknown compiler - please file a bug report" -#endif - -struct thd_info { - void (*body)(void* arg); /* body of a thread */ - void* arg; /* argument to a thread */ - HANDLE join_event; /* if joinable, the join event */ - int joinable; /* true if not detached */ -}; - -static thread_local struct thd_info* g_thd_info; - -/* Destroys a thread info */ -static void destroy_thread(struct thd_info* t) { - if (t->joinable) CloseHandle(t->join_event); - gpr_free(t); -} - -void gpr_thd_init(void) {} - -/* Body of every thread started via gpr_thd_new. */ -static DWORD WINAPI thread_body(void* v) { - g_thd_info = (struct thd_info*)v; - g_thd_info->body(g_thd_info->arg); - if (g_thd_info->joinable) { - BOOL ret = SetEvent(g_thd_info->join_event); - GPR_ASSERT(ret); - } else { - destroy_thread(g_thd_info); - } - return 0; -} - -int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options) { - HANDLE handle; - struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info)); - info->body = thd_body; - info->arg = arg; - *t = 0; - if (gpr_thd_options_is_joinable(options)) { - info->joinable = 1; - info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL); - if (info->join_event == NULL) { - gpr_free(info); - return 0; - } - } else { - info->joinable = 0; - } - handle = CreateThread(NULL, 64 * 1024, thread_body, info, 0, NULL); - if (handle == NULL) { - destroy_thread(info); - } else { - *t = (gpr_thd_id)info; - CloseHandle(handle); - } - return handle != NULL; -} - -gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } - -void gpr_thd_join(gpr_thd_id t) { - struct thd_info* info = (struct thd_info*)t; - DWORD ret = WaitForSingleObject(info->join_event, INFINITE); - GPR_ASSERT(ret == WAIT_OBJECT_0); - destroy_thread(info); -} - -#endif /* GPR_WINDOWS */ diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h new file mode 100644 index 0000000000..f45e78e7f6 --- /dev/null +++ b/src/core/lib/gprpp/thd.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_THD_H +#define GRPC_CORE_LIB_GPRPP_THD_H + +/** Internal thread interface. */ + +#include <grpc/support/port_platform.h> + +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd_id.h> +#include <grpc/support/time.h> + +namespace grpc_core { + +class Thread { + public: + /// Default constructor only to allow use in structs that lack constructors + /// Does not produce a validly-constructed thread; must later + /// use placement new to construct a real thread. Does not init mu_ and cv_ + Thread() : real_(false), alive_(false), started_(false), joined_(false) {} + + Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success = nullptr); + ~Thread(); + + void Start(); + void Join(); + + static void Init(); + static bool AwaitAll(gpr_timespec deadline); + + private: + gpr_mu mu_; + gpr_cv ready_; + + gpr_thd_id id_; + bool real_; + bool alive_; + bool started_; + bool joined_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_GPRPP_THD_H */ diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc new file mode 100644 index 0000000000..4ded4d3fd5 --- /dev/null +++ b/src/core/lib/gprpp/thd_posix.cc @@ -0,0 +1,196 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Posix implementation for gpr threads. */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SYNC + +#include "src/core/lib/gprpp/thd.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd_id.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/lib/gpr/fork.h" +#include "src/core/lib/gpr/useful.h" + +namespace grpc_core { +namespace { +gpr_mu g_mu; +gpr_cv g_cv; +int g_thread_count; +int g_awaiting_threads; + +struct thd_arg { + Thread* thread; + void (*body)(void* arg); /* body of a thread */ + void* arg; /* argument to a thread */ + const char* name; /* name of thread. Can be nullptr. */ +}; + +/***************************************** + * Only used when fork support is enabled + */ + +void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } +} + +void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } +} + +} // namespace + +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) + : real_(true), alive_(false), started_(false), joined_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + pthread_attr_t attr; + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + thd_arg* a = static_cast<thd_arg*>(malloc(sizeof(*a))); + GPR_ASSERT(a != nullptr); + a->thread = this; + a->body = thd_body; + a->arg = arg; + a->name = thd_name; + inc_thd_count(); + + GPR_ASSERT(pthread_attr_init(&attr) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); + + pthread_t p; + alive_ = (pthread_create(&p, &attr, + [](void* v) -> void* { + thd_arg a = *static_cast<thd_arg*>(v); + free(v); + if (a.name != nullptr) { +#if GPR_APPLE_PTHREAD_NAME + /* Apple supports 64 characters, and will + * truncate if it's longer. */ + pthread_setname_np(a.name); +#elif GPR_LINUX_PTHREAD_NAME + /* Linux supports 16 characters max, and will + * error if it's longer. */ + char buf[16]; + size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; + strncpy(buf, a.name, buf_len); + buf[buf_len] = '\0'; + pthread_setname_np(pthread_self(), buf); +#endif // GPR_APPLE_PTHREAD_NAME + } + + gpr_mu_lock(&a.thread->mu_); + if (!a.thread->started_) { + gpr_cv_wait(&a.thread->ready_, &a.thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&a.thread->mu_); + + (*a.body)(a.arg); + dec_thd_count(); + return nullptr; + }, + a) == 0); + + if (success != nullptr) { + *success = alive_; + } + + id_ = gpr_thd_id(p); + GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + + if (!alive_) { + /* don't use gpr_free, as this was allocated using malloc (see above) */ + free(a); + dec_thd_count(); + } +} + +Thread::~Thread() { + if (!alive_) { + // This thread never existed, so nothing to do + } else { + GPR_ASSERT(joined_); + } + if (real_) { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } +} + +void Thread::Start() { + gpr_mu_lock(&mu_); + if (alive_) { + started_ = true; + gpr_cv_signal(&ready_); + } + gpr_mu_unlock(&mu_); +} + +void Thread::Join() { + if (alive_) { + pthread_join(pthread_t(id_), nullptr); + } + joined_ = true; +} + +void Thread::Init() { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + g_thread_count = 0; + g_awaiting_threads = 0; +} + +bool Thread::AwaitAll(gpr_timespec deadline) { + gpr_mu_lock(&g_mu); + g_awaiting_threads = 1; + int res = 0; + if (g_thread_count > 0) { + res = gpr_cv_wait(&g_cv, &g_mu, deadline); + } + g_awaiting_threads = 0; + gpr_mu_unlock(&g_mu); + return res == 0; +} + +} // namespace grpc_core + +// The following is in the external namespace as it is exposed as C89 API +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } + +#endif /* GPR_POSIX_SYNC */ diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc new file mode 100644 index 0000000000..efbed30ac6 --- /dev/null +++ b/src/core/lib/gprpp/thd_windows.cc @@ -0,0 +1,149 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Windows implementation for gpr threads. */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINDOWS + +#include "src/core/lib/gprpp/thd.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd_id.h> +#include <string.h> + +#if defined(_MSC_VER) +#define thread_local __declspec(thread) +#define WIN_LAMBDA +#elif defined(__GNUC__) +#define thread_local __thread +#define WIN_LAMBDA WINAPI +#else +#error "Unknown compiler - please file a bug report" +#endif + +namespace { +struct thd_info { + grpc_core::Thread* thread; + void (*body)(void* arg); /* body of a thread */ + void* arg; /* argument to a thread */ + HANDLE join_event; /* the join event */ +}; + +thread_local struct thd_info* g_thd_info; + +/* Destroys a thread info */ +void destroy_thread(struct thd_info* t) { + CloseHandle(t->join_event); + gpr_free(t); +} +} // namespace + +namespace grpc_core { + +void Thread::Init() {} + +bool Thread::AwaitAll(gpr_timespec deadline) { + // TODO: Consider adding this if needed + return false; +} + +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) + : real_(true), alive_(false), started_(false), joined_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + + HANDLE handle; + struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info)); + info->thread = this; + info->body = thd_body; + info->arg = arg; + + info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); + if (info->join_event == nullptr) { + gpr_free(info); + alive_ = false; + } else { + handle = CreateThread(nullptr, 64 * 1024, + [](void* v) WIN_LAMBDA -> DWORD { + g_thd_info = static_cast<thd_info*>(v); + gpr_mu_lock(&g_thd_info->thread->mu_); + if (!g_thd_info->thread->started_) { + gpr_cv_wait(&g_thd_info->thread->ready_, + &g_thd_info->thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&g_thd_info->thread->mu_); + g_thd_info->body(g_thd_info->arg); + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); + return 0; + }, + info, 0, nullptr); + if (handle == nullptr) { + destroy_thread(info); + alive_ = false; + } else { + id_ = (gpr_thd_id)info; + CloseHandle(handle); + alive_ = true; + } + } + if (success != nullptr) { + *success = alive_; + } +} + +Thread::~Thread() { + if (!alive_) { + // This thread never existed, so nothing to do + } else { + GPR_ASSERT(joined_); + } + if (real_) { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } +} + +void Thread::Start() { + gpr_mu_lock(&mu_); + if (alive_) { + started_ = true; + gpr_cv_signal(&ready_); + } + gpr_mu_unlock(&mu_); +} + +void Thread::Join() { + if (alive_) { + thd_info* info = (thd_info*)id_; + DWORD ret = WaitForSingleObject(info->join_event, INFINITE); + GPR_ASSERT(ret == WAIT_OBJECT_0); + destroy_thread(info); + } + joined_ = true; +} +} // namespace grpc_core + +gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } + +#endif /* GPR_WINDOWS */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e630ddf8e0..446b84d4b1 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -29,6 +29,7 @@ #include <string.h> #include <sys/socket.h> #include <unistd.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -36,9 +37,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/murmur_hash.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" @@ -253,8 +254,11 @@ typedef struct poll_result { } poll_result; typedef struct poll_args { + grpc_core::Thread poller_thd; gpr_cv trigger; int trigger_set; + gpr_cv harvest; + gpr_cv join; struct pollfd* fds; nfds_t nfds; poll_result* result; @@ -264,15 +268,17 @@ typedef struct poll_args { // This is a 2-tiered cache, we mantain a hash table // of active poll calls, so we can wait on the result -// of that call. We also maintain a freelist of inactive -// poll threads. +// of that call. We also maintain freelists of inactive +// poll args and of dead poller threads. typedef struct poll_hash_table { poll_args* free_pollers; poll_args** active_pollers; + poll_args* dead_pollers; unsigned int size; unsigned int count; } poll_hash_table; +// TODO(kpayson64): Eliminate use of global non-POD variables poll_hash_table poll_cache; grpc_cv_fd_table g_cvfds; @@ -1299,6 +1305,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { static void run_poll(void* args); static void cache_poller_locked(poll_args* args); +static void cache_harvest_locked(); static void cache_insert_locked(poll_args* args) { uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), @@ -1361,6 +1368,8 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { poll_args* pargs = static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args))); gpr_cv_init(&pargs->trigger); + gpr_cv_init(&pargs->harvest); + gpr_cv_init(&pargs->join); pargs->fds = fds; pargs->nfds = count; pargs->next = nullptr; @@ -1368,11 +1377,9 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { pargs->trigger_set = 0; init_result(pargs); cache_poller_locked(pargs); - gpr_thd_id t_id; - gpr_thd_options opt = gpr_thd_options_default(); gpr_ref(&g_cvfds.pollcount); - gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt)); + new (&pargs->poller_thd) grpc_core::Thread("grpc_poller", &run_poll, pargs); + pargs->poller_thd.Start(); return pargs; } @@ -1437,7 +1444,30 @@ static void cache_destroy_locked(poll_args* args) { poll_cache.free_pollers = args->next; } - gpr_free(args); + // Now move this args to the dead poller list for later join + if (poll_cache.dead_pollers != nullptr) { + poll_cache.dead_pollers->prev = args; + } + args->prev = nullptr; + args->next = poll_cache.dead_pollers; + poll_cache.dead_pollers = args; +} + +static void cache_harvest_locked() { + while (poll_cache.dead_pollers) { + poll_args* args = poll_cache.dead_pollers; + poll_cache.dead_pollers = poll_cache.dead_pollers->next; + // Keep the list consistent in case new dead pollers get added when we + // release the lock below to wait on joining + if (poll_cache.dead_pollers) { + poll_cache.dead_pollers->prev = nullptr; + } + gpr_cv_signal(&args->harvest); + gpr_cv_wait(&args->join, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + args->poller_thd.Join(); + args->poller_thd.~Thread(); + gpr_free(args); + } } static void decref_poll_result(poll_result* res) { @@ -1469,6 +1499,7 @@ static void run_poll(void* args) { poll_result* result = pargs->result; int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); if (retval != 0) { result->completed = 1; result->retval = retval; @@ -1488,6 +1519,7 @@ static void run_poll(void* args) { deadline = gpr_time_add(deadline, thread_grace); pargs->trigger_set = 0; gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + cache_harvest_locked(); if (!pargs->trigger_set) { cache_destroy_locked(pargs); break; @@ -1496,10 +1528,12 @@ static void run_poll(void* args) { gpr_mu_unlock(&g_cvfds.mu); } - // We still have the lock here if (gpr_unref(&g_cvfds.pollcount)) { gpr_cv_signal(&g_cvfds.shutdown_cv); } + gpr_cv_wait(&pargs->harvest, &g_cvfds.mu, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_cv_signal(&pargs->join); gpr_mu_unlock(&g_cvfds.mu); } @@ -1512,6 +1546,7 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node))); pollcv->next = nullptr; gpr_cv pollcv_cv; @@ -1575,12 +1610,14 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { pargs->trigger_set = 1; gpr_cv_signal(&pargs->trigger); gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); res = result->retval; errno = result->err; result->watchcount--; remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); } idx = 0; @@ -1637,6 +1674,7 @@ static void global_cv_fd_table_init() { for (unsigned int i = 0; i < poll_cache.size; i++) { poll_cache.active_pollers[i] = nullptr; } + poll_cache.dead_pollers = nullptr; gpr_mu_unlock(&g_cvfds.mu); } @@ -1655,6 +1693,7 @@ static void global_cv_fd_table_shutdown() { grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + cache_harvest_locked(); gpr_free(poll_cache.active_pollers); gpr_mu_unlock(&g_cvfds.mu); diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 1a2284f474..e9fa08aeb8 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -21,7 +21,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index d2a050919e..b526c14af4 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -19,6 +19,7 @@ #include "src/core/lib/iomgr/executor.h" #include <string.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/cpu.h> @@ -27,9 +28,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/spinlock.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 @@ -41,7 +42,7 @@ typedef struct { size_t depth; bool shutdown; bool queued_long_job; - gpr_thd_id id; + grpc_core::Thread thd; } thread_state; static thread_state* g_thread_state; @@ -99,13 +100,13 @@ void grpc_executor_set_threading(bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); + new (&g_thread_state[i].thd) grpc_core::Thread(); g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread, - &g_thread_state[0], &opt); + new (&g_thread_state[0].thd) + grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); + g_thread_state[0].thd.Start(); } else { if (cur_threads == 0) return; for (size_t i = 0; i < g_max_threads; i++) { @@ -119,10 +120,11 @@ void grpc_executor_set_threading(bool threading) { gpr_spinlock_lock(&g_adding_thread_lock); gpr_spinlock_unlock(&g_adding_thread_lock); for (gpr_atm i = 0; i < g_cur_threads; i++) { - gpr_thd_join(g_thread_state[i].id); + g_thread_state[i].thd.Join(); } gpr_atm_no_barrier_store(&g_cur_threads, 0); for (size_t i = 0; i < g_max_threads; i++) { + g_thread_state[i].thd.~Thread(); gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); run_closures(g_thread_state[i].elems); @@ -262,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error, if (cur_thread_count < g_max_threads) { gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor", - executor_thread, &g_thread_state[cur_thread_count], &opt); + new (&g_thread_state[cur_thread_count].thd) + grpc_core::Thread("grpc_executor", executor_thread, + &g_thread_state[cur_thread_count]); + g_thread_state[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&g_adding_thread_lock); } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index c9a65c5702..e8ba65d1d3 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -27,7 +27,7 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/fork.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer_manager.h" @@ -51,7 +51,7 @@ void grpc_prefork() { grpc_timer_manager_set_threading(false); grpc_executor_set_threading(false); grpc_core::ExecCtx::Get()->Flush(); - if (!gpr_await_threads( + if (!grpc_core::Thread::AwaitAll( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(3, GPR_TIMESPAN)))) { gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index 4716872ce4..24af0c1db8 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -28,7 +28,7 @@ #include <grpc/support/log_windows.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/socket_windows.h" diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 70a80e1998..3c2b83a549 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -31,8 +31,8 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index 240a24dee1..7c4196c064 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -22,7 +22,7 @@ #include <grpc/support/log.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/pollset.h" diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 3dc1d871a1..e3d651ec82 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -33,8 +33,8 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 8f4fd04402..2044d85a94 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -35,7 +35,7 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 94e7953fde..7efbaa8364 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -16,22 +16,23 @@ * */ -#include "src/core/lib/iomgr/timer_manager.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <inttypes.h> +#include <new> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/timer_manager.h" -typedef struct completed_thread { - gpr_thd_id t; - struct completed_thread* next; -} completed_thread; +struct completed_thread { + grpc_core::Thread thd; + completed_thread* next; +}; extern grpc_core::TraceFlag grpc_timer_check_trace; @@ -67,7 +68,8 @@ static void gc_completed_threads(void) { g_completed_threads = nullptr; gpr_mu_unlock(&g_mu); while (to_gc != nullptr) { - gpr_thd_join(to_gc->t); + to_gc->thd.Join(); + to_gc->thd.~Thread(); completed_thread* next = to_gc->next; gpr_free(to_gc); to_gc = next; @@ -84,18 +86,10 @@ static void start_timer_thread_and_unlock(void) { if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); completed_thread* ct = static_cast<completed_thread*>(gpr_malloc(sizeof(*ct))); - // The call to gpr_thd_new() has to be under the same lock used by - // gc_completed_threads(), particularly due to ct->t, which is written here - // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct - // to leak through g_completed_threads and be freed in gc_completed_threads() - // before "&ct->t" is written to, causing a use-after-free. - gpr_mu_lock(&g_mu); - gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt); - gpr_mu_unlock(&g_mu); + new (&ct->thd) grpc_core::Thread("grpc_global_timer", timer_thread, ct); + ct->thd.Start(); } void grpc_timer_manager_tick() { diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index 41d35cb1fd..a8e175bc34 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -30,8 +30,8 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #define MAX_TABLE_RESIZE 256 diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index ca6705a6b3..97646d1000 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -30,7 +30,7 @@ #include <string.h> #include "src/core/lib/gpr/env.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type; @@ -68,7 +68,7 @@ static pthread_cond_t g_cv; static gpr_timer_log_list g_in_progress_logs; static gpr_timer_log_list g_done_logs; static int g_shutdown; -static gpr_thd_id g_writing_thread; +static grpc_core::Thread* g_writing_thread; static __thread int g_thread_id; static int g_next_thread_id; static int g_writing_enabled = 1; @@ -182,7 +182,8 @@ static void finish_writing(void) { g_shutdown = 1; pthread_cond_signal(&g_cv); pthread_mutex_unlock(&g_mu); - gpr_thd_join(g_writing_thread); + g_writing_thread->Join(); + delete g_writing_thread; gpr_log(GPR_INFO, "flushing logs"); @@ -201,10 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) { } static void init_output() { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - GPR_ASSERT(gpr_thd_new(&g_writing_thread, "timer_output_thread", - writing_thread, NULL, &options)); + g_writing_thread = + new grpc_core::Thread("timer_output_thread", writing_thread, nullptr); atexit(finish_writing); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 7bc24a5049..ac9f9e6066 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -32,7 +32,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/fork.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" @@ -123,7 +123,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); - gpr_thd_init(); + grpc_core::Thread::Init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc index 1654d893d0..f8fe91fc46 100644 --- a/src/core/tsi/alts_transport_security.cc +++ b/src/core/tsi/alts_transport_security.cc @@ -54,7 +54,8 @@ void grpc_tsi_alts_shutdown() { grpc_tsi_alts_wait_for_cq_drain(); grpc_completion_queue_destroy(g_alts_resource.cq); grpc_channel_destroy(g_alts_resource.channel); - gpr_thd_join(g_alts_resource.thread_id); + g_alts_resource.thread.Join(); + g_alts_resource.thread.~Thread(); } gpr_cv_destroy(&g_alts_resource.cv); gpr_mu_destroy(&g_alts_resource.mu); diff --git a/src/core/tsi/alts_transport_security.h b/src/core/tsi/alts_transport_security.h index 37febd1e28..c45ffd3fd6 100644 --- a/src/core/tsi/alts_transport_security.h +++ b/src/core/tsi/alts_transport_security.h @@ -22,10 +22,10 @@ #include <grpc/grpc.h> #include <grpc/support/sync.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" typedef struct alts_shared_resource { - gpr_thd_id thread_id; + grpc_core::Thread thread; grpc_channel* channel; grpc_completion_queue* cq; gpr_mu mu; |