diff options
author | Vijay Pai <vpai@google.com> | 2018-02-13 14:40:39 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-02-15 21:30:13 -0800 |
commit | 58a62755fc6546a117b7b8f3a0a344f85b2ea5f9 (patch) | |
tree | 294e8432672a2a8b3b2bd1bab7d24e75e1a6d4b6 | |
parent | b0d71823a0f031ad1c04be30f22653177139da0b (diff) |
Remove support for detached threads. All threads must be joined.
43 files changed, 154 insertions, 264 deletions
@@ -517,7 +517,6 @@ grpc_cc_library( "src/core/lib/gpr/sync.cc", "src/core/lib/gpr/sync_posix.cc", "src/core/lib/gpr/sync_windows.cc", - "src/core/lib/gpr/thd.cc", "src/core/lib/gpr/thd_posix.cc", "src/core/lib/gpr/thd_windows.cc", "src/core/lib/gpr/time.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 9155d2beb3..eac78a43e1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -651,7 +651,6 @@ add_library(gpr src/core/lib/gpr/sync.cc src/core/lib/gpr/sync_posix.cc src/core/lib/gpr/sync_windows.cc - src/core/lib/gpr/thd.cc src/core/lib/gpr/thd_posix.cc src/core/lib/gpr/thd_windows.cc src/core/lib/gpr/time.cc @@ -2899,7 +2899,6 @@ LIBGPR_SRC = \ src/core/lib/gpr/sync.cc \ src/core/lib/gpr/sync_posix.cc \ src/core/lib/gpr/sync_windows.cc \ - src/core/lib/gpr/thd.cc \ src/core/lib/gpr/thd_posix.cc \ src/core/lib/gpr/thd_windows.cc \ src/core/lib/gpr/time.cc \ diff --git a/build.yaml b/build.yaml index 74c76a4072..b9646276ea 100644 --- a/build.yaml +++ b/build.yaml @@ -59,7 +59,6 @@ filegroups: - src/core/lib/gpr/sync.cc - src/core/lib/gpr/sync_posix.cc - src/core/lib/gpr/sync_windows.cc - - src/core/lib/gpr/thd.cc - src/core/lib/gpr/thd_posix.cc - src/core/lib/gpr/thd_windows.cc - src/core/lib/gpr/time.cc @@ -65,7 +65,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/gpr/sync.cc \ src/core/lib/gpr/sync_posix.cc \ src/core/lib/gpr/sync_windows.cc \ - src/core/lib/gpr/thd.cc \ src/core/lib/gpr/thd_posix.cc \ src/core/lib/gpr/thd_windows.cc \ src/core/lib/gpr/time.cc \ diff --git a/config.w32 b/config.w32 index 2ef122b630..3665a9689f 100644 --- a/config.w32 +++ b/config.w32 @@ -42,7 +42,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\gpr\\sync.cc " + "src\\core\\lib\\gpr\\sync_posix.cc " + "src\\core\\lib\\gpr\\sync_windows.cc " + - "src\\core\\lib\\gpr\\thd.cc " + "src\\core\\lib\\gpr\\thd_posix.cc " + "src\\core\\lib\\gpr\\thd_windows.cc " + "src\\core\\lib\\gpr\\time.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 95db0d8e7d..ffeed05c33 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -233,7 +233,6 @@ Pod::Spec.new do |s| 'src/core/lib/gpr/sync.cc', 'src/core/lib/gpr/sync_posix.cc', 'src/core/lib/gpr/sync_windows.cc', - 'src/core/lib/gpr/thd.cc', 'src/core/lib/gpr/thd_posix.cc', 'src/core/lib/gpr/thd_windows.cc', 'src/core/lib/gpr/time.cc', diff --git a/grpc.gemspec b/grpc.gemspec index ac901da0fe..df8588c671 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -124,7 +124,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gpr/sync.cc ) s.files += %w( src/core/lib/gpr/sync_posix.cc ) s.files += %w( src/core/lib/gpr/sync_windows.cc ) - s.files += %w( src/core/lib/gpr/thd.cc ) s.files += %w( src/core/lib/gpr/thd_posix.cc ) s.files += %w( src/core/lib/gpr/thd_windows.cc ) s.files += %w( src/core/lib/gpr/time.cc ) @@ -187,7 +187,6 @@ 'src/core/lib/gpr/sync.cc', 'src/core/lib/gpr/sync_posix.cc', 'src/core/lib/gpr/sync_windows.cc', - 'src/core/lib/gpr/thd.cc', 'src/core/lib/gpr/thd_posix.cc', 'src/core/lib/gpr/thd_windows.cc', 'src/core/lib/gpr/time.cc', diff --git a/package.xml b/package.xml index 5575855648..171ae5bca7 100644 --- a/package.xml +++ b/package.xml @@ -131,7 +131,6 @@ <file baseinstalldir="/" name="src/core/lib/gpr/sync.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/sync_posix.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/sync_windows.cc" role="src" /> - <file baseinstalldir="/" name="src/core/lib/gpr/thd.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/thd_posix.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/thd_windows.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/time.cc" role="src" /> diff --git a/src/core/lib/gpr/thd.cc b/src/core/lib/gpr/thd.cc deleted file mode 100644 index 11391418b1..0000000000 --- a/src/core/lib/gpr/thd.cc +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* Platform-independent features for gpr threads. */ - -#include "src/core/lib/gpr/thd.h" - -#include <string.h> - -enum { GPR_THD_JOINABLE = 1 }; - -gpr_thd_options gpr_thd_options_default(void) { - gpr_thd_options options; - memset(&options, 0, sizeof(options)); - return options; -} - -void gpr_thd_options_set_detached(gpr_thd_options* options) { - options->flags &= ~GPR_THD_JOINABLE; -} - -void gpr_thd_options_set_joinable(gpr_thd_options* options) { - options->flags |= GPR_THD_JOINABLE; -} - -int gpr_thd_options_is_detached(const gpr_thd_options* options) { - if (!options) return 1; - return (options->flags & GPR_THD_JOINABLE) == 0; -} - -int gpr_thd_options_is_joinable(const gpr_thd_options* options) { - if (!options) return 0; - return (options->flags & GPR_THD_JOINABLE) == GPR_THD_JOINABLE; -} diff --git a/src/core/lib/gpr/thd.h b/src/core/lib/gpr/thd.h index 58ce0d0088..9d8140114c 100644 --- a/src/core/lib/gpr/thd.h +++ b/src/core/lib/gpr/thd.h @@ -28,38 +28,15 @@ #include <grpc/support/thd_id.h> #include <grpc/support/time.h> -/** Thread creation options. */ -typedef struct { - int flags; /** Opaque field. Get and set with accessors below. */ -} gpr_thd_options; - /** Create a new thread running (*thd_body)(arg) and place its thread identifier in *t, and return true. If there are insufficient resources, return false. thd_name is the name of the thread for identification purposes on platforms that support thread naming. - If options==NULL, default options are used. - The thread is immediately runnable, and exits when (*thd_body)() returns. */ + The thread must be joined. */ int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options); - -/** Return a gpr_thd_options struct with all fields set to defaults. */ -gpr_thd_options gpr_thd_options_default(void); - -/** Set the thread to become detached on startup - this is the default. */ -void gpr_thd_options_set_detached(gpr_thd_options* options); - -/** Set the thread to become joinable - mutually exclusive with detached. */ -void gpr_thd_options_set_joinable(gpr_thd_options* options); - -/** Returns non-zero if the option detached is set. */ -int gpr_thd_options_is_detached(const gpr_thd_options* options); - -/** Returns non-zero if the option joinable is set. */ -int gpr_thd_options_is_joinable(const gpr_thd_options* options); + void (*thd_body)(void* arg), void* arg); -/** Blocks until the specified thread properly terminates. - Calling this on a detached thread has unpredictable results. */ +/** Blocks until the specified thread properly terminates. */ void gpr_thd_join(gpr_thd_id t); /* Internal interfaces between modules within the gpr support library. */ diff --git a/src/core/lib/gpr/thd_posix.cc b/src/core/lib/gpr/thd_posix.cc index fcd174bfba..a5640d8cc4 100644 --- a/src/core/lib/gpr/thd_posix.cc +++ b/src/core/lib/gpr/thd_posix.cc @@ -72,8 +72,7 @@ static void* thread_body(void* v) { } int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options) { + void (*thd_body)(void* arg), void* arg) { int thread_started; pthread_attr_t attr; pthread_t p; @@ -87,15 +86,12 @@ int gpr_thd_new(gpr_thd_id* t, const char* thd_name, inc_thd_count(); GPR_ASSERT(pthread_attr_init(&attr) == 0); - if (gpr_thd_options_is_detached(options)) { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == - 0); - } else { - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == - 0); - } + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); + thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); + GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + if (!thread_started) { /* don't use gpr_free, as this was allocated using malloc (see above) */ free(a); diff --git a/src/core/lib/gpr/thd_windows.cc b/src/core/lib/gpr/thd_windows.cc index b467bd2662..53093e80f3 100644 --- a/src/core/lib/gpr/thd_windows.cc +++ b/src/core/lib/gpr/thd_windows.cc @@ -40,15 +40,14 @@ struct thd_info { void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ - HANDLE join_event; /* if joinable, the join event */ - int joinable; /* true if not detached */ + HANDLE join_event; /* the join event */ }; static thread_local struct thd_info* g_thd_info; /* Destroys a thread info */ static void destroy_thread(struct thd_info* t) { - if (t->joinable) CloseHandle(t->join_event); + CloseHandle(t->join_event); gpr_free(t); } @@ -58,32 +57,22 @@ void gpr_thd_init(void) {} static DWORD WINAPI thread_body(void* v) { g_thd_info = (struct thd_info*)v; g_thd_info->body(g_thd_info->arg); - if (g_thd_info->joinable) { - BOOL ret = SetEvent(g_thd_info->join_event); - GPR_ASSERT(ret); - } else { - destroy_thread(g_thd_info); - } + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); return 0; } int gpr_thd_new(gpr_thd_id* t, const char* thd_name, - void (*thd_body)(void* arg), void* arg, - const gpr_thd_options* options) { + void (*thd_body)(void* arg), void* arg) { HANDLE handle; struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info)); info->body = thd_body; info->arg = arg; *t = 0; - if (gpr_thd_options_is_joinable(options)) { - info->joinable = 1; - info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL); - if (info->join_event == NULL) { - gpr_free(info); - return 0; - } - } else { - info->joinable = 0; + info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (info->join_event == NULL) { + gpr_free(info); + return 0; } handle = CreateThread(NULL, 64 * 1024, thread_body, info, 0, NULL); if (handle == NULL) { diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e630ddf8e0..404665c11f 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -253,6 +253,7 @@ typedef struct poll_result { } poll_result; typedef struct poll_args { + gpr_thd_id poller_thd; gpr_cv trigger; int trigger_set; struct pollfd* fds; @@ -262,13 +263,19 @@ typedef struct poll_args { struct poll_args* prev; } poll_args; +typedef struct poller_dead { + gpr_thd_id poller_thd; + struct poller_dead* next; +} poller_dead; + // This is a 2-tiered cache, we mantain a hash table // of active poll calls, so we can wait on the result -// of that call. We also maintain a freelist of inactive -// poll threads. +// of that call. We also maintain freelists of inactive +// poll args and of dead poller threads. typedef struct poll_hash_table { poll_args* free_pollers; poll_args** active_pollers; + poll_args* dead_pollers; unsigned int size; unsigned int count; } poll_hash_table; @@ -1299,6 +1306,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { static void run_poll(void* args); static void cache_poller_locked(poll_args* args); +static void cache_harvest_locked(); static void cache_insert_locked(poll_args* args) { uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), @@ -1368,11 +1376,8 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { pargs->trigger_set = 0; init_result(pargs); cache_poller_locked(pargs); - gpr_thd_id t_id; - gpr_thd_options opt = gpr_thd_options_default(); gpr_ref(&g_cvfds.pollcount); - gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt)); + GPR_ASSERT(gpr_thd_new(&pargs->poller_thd, "grpc_poller", &run_poll, pargs)); return pargs; } @@ -1437,7 +1442,22 @@ static void cache_destroy_locked(poll_args* args) { poll_cache.free_pollers = args->next; } - gpr_free(args); + // Now move this args to the dead poller list for later join + if (poll_cache.dead_pollers != nullptr) { + poll_cache.dead_pollers->prev = args; + } + args->prev = nullptr; + args->next = poll_cache.dead_pollers; + poll_cache.dead_pollers = args; +} + +static void cache_harvest_locked() { + while (poll_cache.dead_pollers) { + poll_args* args = poll_cache.dead_pollers; + poll_cache.dead_pollers = poll_cache.dead_pollers->next; + gpr_thd_join(args->poller_thd); + gpr_free(args); + } } static void decref_poll_result(poll_result* res) { @@ -1469,6 +1489,7 @@ static void run_poll(void* args) { poll_result* result = pargs->result; int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); if (retval != 0) { result->completed = 1; result->retval = retval; @@ -1488,6 +1509,7 @@ static void run_poll(void* args) { deadline = gpr_time_add(deadline, thread_grace); pargs->trigger_set = 0; gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + cache_harvest_locked(); if (!pargs->trigger_set) { cache_destroy_locked(pargs); break; @@ -1496,7 +1518,6 @@ static void run_poll(void* args) { gpr_mu_unlock(&g_cvfds.mu); } - // We still have the lock here if (gpr_unref(&g_cvfds.pollcount)) { gpr_cv_signal(&g_cvfds.shutdown_cv); } @@ -1512,6 +1533,7 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node))); pollcv->next = nullptr; gpr_cv pollcv_cv; @@ -1575,12 +1597,14 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { pargs->trigger_set = 1; gpr_cv_signal(&pargs->trigger); gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); res = result->retval; errno = result->err; result->watchcount--; remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); } idx = 0; @@ -1637,6 +1661,7 @@ static void global_cv_fd_table_init() { for (unsigned int i = 0; i < poll_cache.size; i++) { poll_cache.active_pollers[i] = nullptr; } + poll_cache.dead_pollers = nullptr; gpr_mu_unlock(&g_cvfds.mu); } @@ -1655,6 +1680,7 @@ static void global_cv_fd_table_shutdown() { grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + cache_harvest_locked(); gpr_free(poll_cache.active_pollers); gpr_mu_unlock(&g_cvfds.mu); diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index d2a050919e..baf0cfc48f 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -102,10 +102,8 @@ void grpc_executor_set_threading(bool threading) { g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread, - &g_thread_state[0], &opt); + &g_thread_state[0]); } else { if (cur_threads == 0) return; for (size_t i = 0; i < g_max_threads; i++) { @@ -262,10 +260,8 @@ static void executor_push(grpc_closure* closure, grpc_error* error, if (cur_thread_count < g_max_threads) { gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor", - executor_thread, &g_thread_state[cur_thread_count], &opt); + executor_thread, &g_thread_state[cur_thread_count]); } gpr_spinlock_unlock(&g_adding_thread_lock); } diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 94e7953fde..39ef575d12 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -84,8 +84,6 @@ static void start_timer_thread_and_unlock(void) { if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); completed_thread* ct = static_cast<completed_thread*>(gpr_malloc(sizeof(*ct))); // The call to gpr_thd_new() has to be under the same lock used by @@ -94,7 +92,7 @@ static void start_timer_thread_and_unlock(void) { // to leak through g_completed_threads and be freed in gc_completed_threads() // before "&ct->t" is written to, causing a use-after-free. gpr_mu_lock(&g_mu); - gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt); + gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct); gpr_mu_unlock(&g_mu); } diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index ca6705a6b3..07e21f5180 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -201,10 +201,8 @@ void gpr_timers_set_log_filename(const char* filename) { } static void init_output() { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); GPR_ASSERT(gpr_thd_new(&g_writing_thread, "timer_output_thread", - writing_thread, NULL, &options)); + writing_thread, NULL)); atexit(finish_writing); } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2da3c0bde8..df6adc4884 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -41,7 +41,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/gpr/sync.cc', 'src/core/lib/gpr/sync_posix.cc', 'src/core/lib/gpr/sync_windows.cc', - 'src/core/lib/gpr/thd.cc', 'src/core/lib/gpr/thd_posix.cc', 'src/core/lib/gpr/thd_windows.cc', 'src/core/lib/gpr/time.cc', diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 6055ccbf4b..9abc1c40d9 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -220,11 +220,11 @@ void grpc_run_bad_client_test( /* Check a ground truth */ GPR_ASSERT(grpc_server_has_open_connections(a.server)); - gpr_thd_id id; + gpr_thd_id server_validator_id; gpr_event_init(&a.done_thd); a.validator = server_validator; /* Start validator */ - gpr_thd_new(&id, "grpc_bad_client", thd_func, &a, nullptr); + gpr_thd_new(&server_validator_id, "grpc_bad_client", thd_func, &a); for (int i = 0; i < num_args; i++) { grpc_run_client_side_validator(&args[i], i == (num_args - 1) ? flags : 0, &sfd, client_cq); @@ -234,6 +234,7 @@ void grpc_run_bad_client_test( /* Shutdown. */ shutdown_client(&sfd.client); + gpr_thd_join(server_validator_id); shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr); grpc_server_shutdown_and_notify(a.server, shutdown_cq, nullptr); diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 1af168e1f9..e7b2cafa35 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -253,15 +253,17 @@ static void actually_poll_server(void* arg) { gpr_free(pa); } -static void poll_server_until_read_done(test_tcp_server* server, - gpr_event* signal_when_done) { +static gpr_thd_id poll_server_until_read_done(test_tcp_server* server, + + gpr_event* signal_when_done) { gpr_atm_rel_store(&state.done_atm, 0); state.write_done = 0; gpr_thd_id id; poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa))); pa->server = server; pa->signal_when_done = signal_when_done; - gpr_thd_new(&id, "grpc_poll_server", actually_poll_server, pa, nullptr); + gpr_thd_new(&id, "grpc_poll_server", actually_poll_server, pa); + return id; } static void run_test(const char* response_payload, @@ -281,9 +283,10 @@ static void run_test(const char* response_payload, state.response_payload_length = response_payload_length; /* poll server until sending out the response */ - poll_server_until_read_done(&test_server, &ev); + gpr_thd_id id = poll_server_until_read_done(&test_server, &ev); start_rpc(server_port, expected_status, expected_detail); gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_thd_join(id); /* clean up */ grpc_endpoint_shutdown(state.tcp, diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 18e8310251..8546ef541f 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -550,10 +550,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy); // Start proxy thread. - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - GPR_ASSERT( - gpr_thd_new(&proxy->thd, "grpc_http_proxy", thread_main, proxy, &opt)); + GPR_ASSERT(gpr_thd_new(&proxy->thd, "grpc_http_proxy", thread_main, proxy)); return proxy; } diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc index bc3b0ca35c..db49534899 100644 --- a/test/core/end2end/fixtures/proxy.cc +++ b/test/core/end2end/fixtures/proxy.cc @@ -76,7 +76,6 @@ static void request_call(grpc_end2end_proxy* proxy); grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def, grpc_channel_args* client_args, grpc_channel_args* server_args) { - gpr_thd_options opt = gpr_thd_options_default(); int proxy_port = grpc_pick_unused_port_or_die(); int server_port = grpc_pick_unused_port_or_die(); @@ -98,9 +97,8 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def, grpc_server_start(proxy->server); grpc_call_details_init(&proxy->new_call_details); - gpr_thd_options_set_joinable(&opt); GPR_ASSERT( - gpr_thd_new(&proxy->thd, "grpc_end2end_proxy", thread_main, proxy, &opt)); + gpr_thd_new(&proxy->thd, "grpc_end2end_proxy", thread_main, proxy)); request_call(proxy); diff --git a/test/core/end2end/tests/connectivity.cc b/test/core/end2end/tests/connectivity.cc index a517ffa686..d4887137f8 100644 --- a/test/core/end2end/tests/connectivity.cc +++ b/test/core/end2end/tests/connectivity.cc @@ -50,7 +50,6 @@ static void test_connectivity(grpc_end2end_test_config config) { grpc_connectivity_state state; cq_verifier* cqv = cq_verifier_create(f.cq); child_events ce; - gpr_thd_options thdopt = gpr_thd_options_default(); gpr_thd_id thdid; grpc_channel_args client_args; @@ -67,9 +66,7 @@ static void test_connectivity(grpc_end2end_test_config config) { ce.channel = f.client; ce.cq = f.cq; gpr_event_init(&ce.started); - gpr_thd_options_set_joinable(&thdopt); - GPR_ASSERT( - gpr_thd_new(&thdid, "grpc_connectivity", child_thread, &ce, &thdopt)); + GPR_ASSERT(gpr_thd_new(&thdid, "grpc_connectivity", child_thread, &ce)); gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC)); diff --git a/test/core/gpr/arena_test.cc b/test/core/gpr/arena_test.cc index 9eaf57b631..1cfaefa686 100644 --- a/test/core/gpr/arena_test.cc +++ b/test/core/gpr/arena_test.cc @@ -100,10 +100,7 @@ static void concurrent_test(void) { gpr_thd_id thds[CONCURRENT_TEST_THREADS]; for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&thds[i], "grpc_concurrent_test", concurrent_test_body, &args, - &opt); + gpr_thd_new(&thds[i], "grpc_concurrent_test", concurrent_test_body, &args); } gpr_event_set(&args.ev_start, (void*)1); diff --git a/test/core/gpr/cpu_test.cc b/test/core/gpr/cpu_test.cc index 9f2c3f1923..c97facef7d 100644 --- a/test/core/gpr/cpu_test.cc +++ b/test/core/gpr/cpu_test.cc @@ -101,7 +101,7 @@ static void cpu_test(void) { uint32_t i; int cores_seen = 0; struct cpu_test ct; - gpr_thd_id thd; + gpr_thd_id* thd; ct.ncores = gpr_cpu_num_cores(); GPR_ASSERT(ct.ncores > 0); ct.nthreads = static_cast<int>(ct.ncores) * 3; @@ -110,15 +110,22 @@ static void cpu_test(void) { gpr_mu_init(&ct.mu); gpr_cv_init(&ct.done_cv); ct.is_done = 0; - for (i = 0; i < ct.ncores * 3; i++) { - GPR_ASSERT( - gpr_thd_new(&thd, "grpc_cpu_test", &worker_thread, &ct, nullptr)); + + uint32_t nthreads = ct.ncores * 3; + thd = static_cast<gpr_thd_id*>(gpr_malloc(sizeof(thd[0]) * nthreads)); + + for (i = 0; i < nthreads; i++) { + GPR_ASSERT(gpr_thd_new(&thd[i], "grpc_cpu_test", &worker_thread, &ct)); } gpr_mu_lock(&ct.mu); while (!ct.is_done) { gpr_cv_wait(&ct.done_cv, &ct.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&ct.mu); + for (i = 0; i < nthreads; i++) { + gpr_thd_join(thd[i]); + } + gpr_free(thd); fprintf(stderr, "Saw cores ["); fflush(stderr); for (i = 0; i < ct.ncores; i++) { diff --git a/test/core/gpr/mpscq_test.cc b/test/core/gpr/mpscq_test.cc index 96813466c9..55998445f6 100644 --- a/test/core/gpr/mpscq_test.cc +++ b/test/core/gpr/mpscq_test.cc @@ -81,13 +81,10 @@ static void test_mt(void) { gpr_mpscq q; gpr_mpscq_init(&q); for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); ta[i].ctr = 0; ta[i].q = &q; ta[i].start = &start; - GPR_ASSERT( - gpr_thd_new(&thds[i], "grpc_mt_test", test_thread, &ta[i], &options)); + GPR_ASSERT(gpr_thd_new(&thds[i], "grpc_mt_test", test_thread, &ta[i])); } size_t num_done = 0; size_t spins = 0; @@ -153,13 +150,11 @@ static void test_mt_multipop(void) { gpr_mpscq q; gpr_mpscq_init(&q); for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); ta[i].ctr = 0; ta[i].q = &q; ta[i].start = &start; - GPR_ASSERT(gpr_thd_new(&thds[i], "grpc_multipop_test", test_thread, &ta[i], - &options)); + GPR_ASSERT( + gpr_thd_new(&thds[i], "grpc_multipop_test", test_thread, &ta[i])); } pull_args pa; pa.ta = ta; @@ -170,10 +165,8 @@ static void test_mt_multipop(void) { pa.start = &start; gpr_mu_init(&pa.mu); for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - GPR_ASSERT(gpr_thd_new(&pull_thds[i], "grpc_multipop_pull", pull_thread, - &pa, &options)); + GPR_ASSERT( + gpr_thd_new(&pull_thds[i], "grpc_multipop_pull", pull_thread, &pa)); } gpr_event_set(&start, (void*)1); for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { diff --git a/test/core/gpr/spinlock_test.cc b/test/core/gpr/spinlock_test.cc index 9f182bc154..1392cff7ad 100644 --- a/test/core/gpr/spinlock_test.cc +++ b/test/core/gpr/spinlock_test.cc @@ -66,10 +66,7 @@ static void test_destroy(struct test* m) { static void test_create_threads(struct test* m, void (*body)(void* arg)) { int i; for (i = 0; i != m->thread_count; i++) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - GPR_ASSERT( - gpr_thd_new(&m->threads[i], "grpc_create_threads", body, m, &opt)); + GPR_ASSERT(gpr_thd_new(&m->threads[i], "grpc_create_threads", body, m)); } } diff --git a/test/core/gpr/sync_test.cc b/test/core/gpr/sync_test.cc index bafd91020b..d2d1f41775 100644 --- a/test/core/gpr/sync_test.cc +++ b/test/core/gpr/sync_test.cc @@ -134,6 +134,7 @@ int queue_remove(queue* q, int* head, gpr_timespec abs_deadline) { /* Tests for gpr_mu and gpr_cv, and the queue example. */ struct test { int threads; /* number of threads */ + gpr_thd_id* thread_ids; int64_t iterations; /* number of iterations per thread */ int64_t counter; @@ -160,6 +161,8 @@ struct test { static struct test* test_new(int threads, int64_t iterations, int incr_step) { struct test* m = static_cast<struct test*>(gpr_malloc(sizeof(*m))); m->threads = threads; + m->thread_ids = + static_cast<gpr_thd_id*>(gpr_malloc(sizeof(*m->thread_ids) * threads)); m->iterations = iterations; m->counter = 0; m->thread_count = 0; @@ -182,15 +185,15 @@ static void test_destroy(struct test* m) { gpr_cv_destroy(&m->cv); gpr_cv_destroy(&m->done_cv); queue_destroy(&m->q); + gpr_free(m->thread_ids); gpr_free(m); } /* Create m->threads threads, each running (*body)(m) */ static void test_create_threads(struct test* m, void (*body)(void* arg)) { - gpr_thd_id id; int i; for (i = 0; i != m->threads; i++) { - GPR_ASSERT(gpr_thd_new(&id, "grpc_create_threads", body, m, nullptr)); + GPR_ASSERT(gpr_thd_new(&m->thread_ids[i], "grpc_create_threads", body, m)); } } @@ -201,6 +204,9 @@ static void test_wait(struct test* m) { gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&m->mu); + for (int i = 0; i != m->threads; i++) { + gpr_thd_join(m->thread_ids[i]); + } } /* Get an integer thread id in the raneg 0..threads-1 */ @@ -245,13 +251,16 @@ static void test(const char* name, void (*body)(void* m), fprintf(stderr, " %ld", static_cast<long>(iterations)); fflush(stderr); m = test_new(10, iterations, incr_step); + gpr_thd_id extra_id; if (extra != nullptr) { - gpr_thd_id id; - GPR_ASSERT(gpr_thd_new(&id, name, extra, m, nullptr)); + GPR_ASSERT(gpr_thd_new(&extra_id, name, extra, m)); m->done++; /* one more thread to wait for */ } test_create_threads(m, body); test_wait(m); + if (extra != nullptr) { + gpr_thd_join(extra_id); + } if (m->counter != m->threads * m->iterations * m->incr_step) { fprintf(stderr, "counter %ld threads %d iterations %ld\n", static_cast<long>(m->counter), m->threads, diff --git a/test/core/gpr/thd_test.cc b/test/core/gpr/thd_test.cc index 18bbaae6c9..47e9a22fd8 100644 --- a/test/core/gpr/thd_test.cc +++ b/test/core/gpr/thd_test.cc @@ -38,7 +38,7 @@ struct test { }; /* A Thread body. Decrement t->n, and if is becomes zero, set t->done. */ -static void thd_body(void* v) { +static void thd_body1(void* v) { struct test* t = static_cast<struct test*>(v); gpr_mu_lock(&t->mu); t->n--; @@ -49,45 +49,38 @@ static void thd_body(void* v) { gpr_mu_unlock(&t->mu); } -static void thd_body_joinable(void* v) {} - -/* Test thread options work as expected */ -static void test_options(void) { - gpr_thd_options options = gpr_thd_options_default(); - GPR_ASSERT(!gpr_thd_options_is_joinable(&options)); - GPR_ASSERT(gpr_thd_options_is_detached(&options)); - gpr_thd_options_set_joinable(&options); - GPR_ASSERT(gpr_thd_options_is_joinable(&options)); - GPR_ASSERT(!gpr_thd_options_is_detached(&options)); - gpr_thd_options_set_detached(&options); - GPR_ASSERT(!gpr_thd_options_is_joinable(&options)); - GPR_ASSERT(gpr_thd_options_is_detached(&options)); -} - -/* Test that we can create a number of threads and wait for them. */ -static void test(void) { +/* Test that we can create a number of threads, wait for them, and join them. */ +static void test1(void) { int i; - gpr_thd_id thd; gpr_thd_id thds[NUM_THREADS]; struct test t; - gpr_thd_options options = gpr_thd_options_default(); gpr_mu_init(&t.mu); gpr_cv_init(&t.done_cv); t.n = NUM_THREADS; t.is_done = 0; for (i = 0; i < NUM_THREADS; i++) { - GPR_ASSERT(gpr_thd_new(&thd, "grpc_thread_test", &thd_body, &t, nullptr)); + GPR_ASSERT(gpr_thd_new(&thds[i], "grpc_thread_body1_test", &thd_body1, &t)); } gpr_mu_lock(&t.mu); while (!t.is_done) { gpr_cv_wait(&t.done_cv, &t.mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&t.mu); + for (i = 0; i < NUM_THREADS; i++) { + gpr_thd_join(thds[i]); + } GPR_ASSERT(t.n == 0); - gpr_thd_options_set_joinable(&options); +} + +static void thd_body2(void* v) {} + +/* Test that we can create a number of threads and join them. */ +static void test2(void) { + int i; + gpr_thd_id thds[NUM_THREADS]; for (i = 0; i < NUM_THREADS; i++) { - GPR_ASSERT(gpr_thd_new(&thds[i], "grpc_joinable_thread_test", - &thd_body_joinable, nullptr, &options)); + GPR_ASSERT( + gpr_thd_new(&thds[i], "grpc_thread_body2_test", &thd_body2, nullptr)); } for (i = 0; i < NUM_THREADS; i++) { gpr_thd_join(thds[i]); @@ -98,7 +91,7 @@ static void test(void) { int main(int argc, char* argv[]) { grpc_test_init(argc, argv); - test_options(); - test(); + test1(); + test2(); return 0; } diff --git a/test/core/gpr/tls_test.cc b/test/core/gpr/tls_test.cc index 1e4534dc5a..f3f0864d3d 100644 --- a/test/core/gpr/tls_test.cc +++ b/test/core/gpr/tls_test.cc @@ -46,7 +46,6 @@ static void thd_body(void* arg) { /* ------------------------------------------------- */ int main(int argc, char* argv[]) { - gpr_thd_options opt = gpr_thd_options_default(); int i; gpr_thd_id threads[NUM_THREADS]; @@ -54,10 +53,8 @@ int main(int argc, char* argv[]) { gpr_tls_init(&test_var); - gpr_thd_options_set_joinable(&opt); - for (i = 0; i < NUM_THREADS; i++) { - gpr_thd_new(&threads[i], "grpc_tls_test", thd_body, nullptr, &opt); + gpr_thd_new(&threads[i], "grpc_tls_test", thd_body, nullptr); } for (i = 0; i < NUM_THREADS; i++) { gpr_thd_join(threads[i]); diff --git a/test/core/handshake/client_ssl.cc b/test/core/handshake/client_ssl.cc index fe2ab251e3..2581eb525c 100644 --- a/test/core/handshake/client_ssl.cc +++ b/test/core/handshake/client_ssl.cc @@ -230,12 +230,9 @@ static bool client_ssl_test(char* server_alpn_preferred) { GPR_ASSERT(server_socket > 0 && port > 0); // Launch the TLS server thread. - gpr_thd_options thdopt = gpr_thd_options_default(); gpr_thd_id thdid; - gpr_thd_options_set_joinable(&thdopt); server_args args = {server_socket, server_alpn_preferred}; - GPR_ASSERT(gpr_thd_new(&thdid, "grpc_client_ssl_test", server_thread, &args, - &thdopt)); + GPR_ASSERT(gpr_thd_new(&thdid, "grpc_client_ssl_test", server_thread, &args)); // Load key pair and establish client SSL credentials. grpc_ssl_pem_key_cert_pair pem_key_cert_pair; diff --git a/test/core/handshake/server_ssl_common.cc b/test/core/handshake/server_ssl_common.cc index d202a7cfd6..a2d389954f 100644 --- a/test/core/handshake/server_ssl_common.cc +++ b/test/core/handshake/server_ssl_common.cc @@ -138,11 +138,8 @@ bool server_ssl_test(const char* alpn_list[], unsigned int alpn_list_len, gpr_event_init(&client_handshake_complete); // Launch the gRPC server thread. - gpr_thd_options thdopt = gpr_thd_options_default(); gpr_thd_id thdid; - gpr_thd_options_set_joinable(&thdopt); - GPR_ASSERT( - gpr_thd_new(&thdid, "grpc_ssl_test", server_thread, &port, &thdopt)); + GPR_ASSERT(gpr_thd_new(&thdid, "grpc_ssl_test", server_thread, &port)); SSL_load_error_strings(); OpenSSL_add_ssl_algorithms(); diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index 8426b3d233..0cd63bd973 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -100,13 +100,11 @@ static void test_execute_many(void) { gpr_thd_id thds[100]; thd_args ta[GPR_ARRAY_SIZE(thds)]; for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); ta[i].ctr = 0; ta[i].lock = lock; gpr_event_init(&ta[i].done); - GPR_ASSERT(gpr_thd_new(&thds[i], "grpc_execute_many", execute_many_loop, - &ta[i], &options)); + GPR_ASSERT( + gpr_thd_new(&thds[i], "grpc_execute_many", execute_many_loop, &ta[i])); } for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { GPR_ASSERT(gpr_event_wait(&ta[i].done, diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index 02d11271ec..5c30843588 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -261,9 +261,7 @@ static void test_threading(void) { gpr_thd_id thds[10]; for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&thds[i], "test_thread", test_threading_loop, &shared, &opt); + gpr_thd_new(&thds[i], "test_thread", test_threading_loop, &shared); } grpc_wakeup_fd fd; GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd))); diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc index 341579f178..6a1564dd69 100644 --- a/test/core/iomgr/resolve_address_posix_test.cc +++ b/test/core/iomgr/resolve_address_posix_test.cc @@ -38,6 +38,7 @@ static gpr_timespec test_deadline(void) { } typedef struct args_struct { + gpr_thd_id id; gpr_event ev; grpc_resolved_addresses* addrs; gpr_atm done_atm; @@ -59,6 +60,7 @@ void args_init(args_struct* args) { void args_finish(args_struct* args) { GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline())); + gpr_thd_join(args->id); grpc_resolved_addresses_destroy(args->addrs); grpc_pollset_set_del_pollset(args->pollset_set, args->pollset); grpc_pollset_set_destroy(args->pollset_set); @@ -101,8 +103,7 @@ static void actually_poll(void* argsp) { static void poll_pollset_until_request_done(args_struct* args) { gpr_atm_rel_store(&args->done_atm, 0); - gpr_thd_id id; - gpr_thd_new(&id, "grpc_poll_pollset", actually_poll, args, nullptr); + gpr_thd_new(&args->id, "grpc_poll_pollset", actually_poll, args); } static void must_succeed(void* argsp, grpc_error* err) { diff --git a/test/core/iomgr/wakeup_fd_cv_test.cc b/test/core/iomgr/wakeup_fd_cv_test.cc index 68dcb50aa6..d77a3ec0b4 100644 --- a/test/core/iomgr/wakeup_fd_cv_test.cc +++ b/test/core/iomgr/wakeup_fd_cv_test.cc @@ -104,7 +104,6 @@ void test_poll_cv_trigger(void) { struct pollfd pfds[6]; poll_args pargs; gpr_thd_id t_id; - gpr_thd_options opt; GPR_ASSERT(grpc_wakeup_fd_init(&cvfd1) == GRPC_ERROR_NONE); GPR_ASSERT(grpc_wakeup_fd_init(&cvfd2) == GRPC_ERROR_NONE); @@ -135,9 +134,7 @@ void test_poll_cv_trigger(void) { pargs.timeout = 1000; pargs.result = -2; - opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs, &opt); + gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs); // Wakeup wakeup_fd not listening for events GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE); @@ -153,7 +150,7 @@ void test_poll_cv_trigger(void) { // Pollin on socket fd pargs.timeout = -1; pargs.result = -2; - gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs, &opt); + gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs); trigger_socket_event(); gpr_thd_join(t_id); GPR_ASSERT(pargs.result == 1); @@ -167,7 +164,7 @@ void test_poll_cv_trigger(void) { // Pollin on wakeup fd reset_socket_event(); pargs.result = -2; - gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs, &opt); + gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs); GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE); gpr_thd_join(t_id); @@ -181,7 +178,7 @@ void test_poll_cv_trigger(void) { // Pollin on wakeupfd before poll() pargs.result = -2; - gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs, &opt); + gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs); gpr_thd_join(t_id); GPR_ASSERT(pargs.result == 1); @@ -198,7 +195,7 @@ void test_poll_cv_trigger(void) { reset_socket_event(); GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE); GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE); - gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs, &opt); + gpr_thd_new(&t_id, "grpc_background_poll", &background_poll, &pargs); gpr_thd_join(t_id); GPR_ASSERT(pargs.result == 0); diff --git a/test/core/network_benchmarks/low_level_ping_pong.cc b/test/core/network_benchmarks/low_level_ping_pong.cc index 33716b9d4a..b90ad8d44d 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.cc +++ b/test/core/network_benchmarks/low_level_ping_pong.cc @@ -586,8 +586,10 @@ static int run_benchmark(const char* socket_type, thread_args* client_args, gpr_log(GPR_INFO, "Starting test %s %s %zu", client_args->strategy_name, socket_type, client_args->msg_size); - gpr_thd_new(&tid, "server_thread", server_thread_wrap, server_args, nullptr); + gpr_thd_new(&tid, "server_thread", server_thread_wrap, server_args); client_thread(client_args); + gpr_thd_join(tid); + return 0; } diff --git a/test/core/surface/completion_queue_threading_test.cc b/test/core/surface/completion_queue_threading_test.cc index 81319f4df4..37232a4fb1 100644 --- a/test/core/surface/completion_queue_threading_test.cc +++ b/test/core/surface/completion_queue_threading_test.cc @@ -80,14 +80,12 @@ static void test_too_many_plucks(void) { grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)]; struct thread_state thread_states[GPR_ARRAY_SIZE(tags)]; - gpr_thd_options thread_options = gpr_thd_options_default(); grpc_core::ExecCtx exec_ctx; unsigned i, j; LOG_TEST("test_too_many_plucks"); cc = grpc_completion_queue_create_for_pluck(nullptr); - gpr_thd_options_set_joinable(&thread_options); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { tags[i] = create_test_tag(); @@ -96,8 +94,8 @@ static void test_too_many_plucks(void) { } thread_states[i].cc = cc; thread_states[i].tag = tags[i]; - gpr_thd_new(thread_ids + i, "grpc_pluck_test", pluck_one, thread_states + i, - &thread_options); + gpr_thd_new(thread_ids + i, "grpc_pluck_test", pluck_one, + thread_states + i); } /* wait until all other threads are plucking */ @@ -220,8 +218,9 @@ static void test_threading(size_t producers, size_t consumers) { "test_threading", producers, consumers); /* start all threads: they will wait for phase1 */ + gpr_thd_id* ids = static_cast<gpr_thd_id*>( + gpr_malloc(sizeof(*ids) * (producers + consumers))); for (i = 0; i < producers + consumers; i++) { - gpr_thd_id id; gpr_event_init(&options[i].on_started); gpr_event_init(&options[i].on_phase1_done); gpr_event_init(&options[i].on_finished); @@ -230,10 +229,9 @@ static void test_threading(size_t producers, size_t consumers) { options[i].events_triggered = 0; options[i].cc = cc; options[i].id = optid++; - GPR_ASSERT(gpr_thd_new(&id, - i < producers ? "grpc_producer" : "grpc_consumer", - i < producers ? producer_thread : consumer_thread, - options + i, nullptr)); + GPR_ASSERT(gpr_thd_new( + &ids[i], i < producers ? "grpc_producer" : "grpc_consumer", + i < producers ? producer_thread : consumer_thread, options + i)); gpr_event_wait(&options[i].on_started, ten_seconds_time()); } @@ -266,6 +264,11 @@ static void test_threading(size_t producers, size_t consumers) { /* destroy the completion channel */ grpc_completion_queue_destroy(cc); + for (i = 0; i < producers + consumers; i++) { + gpr_thd_join(ids[i]); + } + gpr_free(ids); + /* verify that everything was produced and consumed */ for (i = 0; i < producers + consumers; i++) { if (i < producers) { diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 95af4abd48..974263d914 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -176,14 +176,11 @@ int run_concurrent_connectivity_test() { gpr_thd_id server; char* localhost = gpr_strdup("localhost:54321"); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); /* First round, no server */ gpr_log(GPR_DEBUG, "Wave 1"); for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_1", create_loop_destroy, localhost, - &options); + gpr_thd_new(&threads[i], "grpc_wave_1", create_loop_destroy, localhost); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); @@ -199,11 +196,10 @@ int run_concurrent_connectivity_test() { args.cq = grpc_completion_queue_create_for_next(nullptr); grpc_server_register_completion_queue(args.server, args.cq, nullptr); grpc_server_start(args.server); - gpr_thd_new(&server, "grpc_wave_2_server", server_thread, &args, &options); + gpr_thd_new(&server, "grpc_wave_2_server", server_thread, &args); for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_2", create_loop_destroy, args.addr, - &options); + gpr_thd_new(&threads[i], "grpc_wave_2", create_loop_destroy, args.addr); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); @@ -220,13 +216,11 @@ int run_concurrent_connectivity_test() { args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); grpc_pollset_init(args.pollset, &args.mu); gpr_event_init(&args.ready); - gpr_thd_new(&server, "grpc_wave_3_server", bad_server_thread, &args, - &options); + gpr_thd_new(&server, "grpc_wave_3_server", bad_server_thread, &args); gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_3", create_loop_destroy, args.addr, - &options); + gpr_thd_new(&threads[i], "grpc_wave_3", create_loop_destroy, args.addr); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); @@ -281,12 +275,10 @@ int run_concurrent_watches_with_short_timeouts_test() { gpr_thd_id threads[NUM_THREADS]; char* localhost = gpr_strdup("localhost:54321"); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_new(&threads[i], "grpc_short_watches", watches_with_short_timeouts, - localhost, &options); + localhost); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); diff --git a/test/core/surface/sequential_connectivity_test.cc b/test/core/surface/sequential_connectivity_test.cc index 428d17ff1b..8ca7b4f5dc 100644 --- a/test/core/surface/sequential_connectivity_test.cc +++ b/test/core/surface/sequential_connectivity_test.cc @@ -68,9 +68,7 @@ static void run_test(const test_fixture* fixture) { server_thread_args sta = {server, server_cq}; gpr_thd_id server_thread; - gpr_thd_options thdopt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&thdopt); - gpr_thd_new(&server_thread, "grpc_server", server_thread_func, &sta, &thdopt); + gpr_thd_new(&server_thread, "grpc_server", server_thread_func, &sta); grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_channel* channels[NUM_CONNECTIONS]; diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c095b5bed9..740d0d5738 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1094,7 +1094,6 @@ src/core/lib/gpr/string_windows.h \ src/core/lib/gpr/sync.cc \ src/core/lib/gpr/sync_posix.cc \ src/core/lib/gpr/sync_windows.cc \ -src/core/lib/gpr/thd.cc \ src/core/lib/gpr/thd.h \ src/core/lib/gpr/thd_posix.cc \ src/core/lib/gpr/thd_windows.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 3efaa6e686..e87325b594 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8223,7 +8223,6 @@ "src/core/lib/gpr/sync.cc", "src/core/lib/gpr/sync_posix.cc", "src/core/lib/gpr/sync_windows.cc", - "src/core/lib/gpr/thd.cc", "src/core/lib/gpr/thd_posix.cc", "src/core/lib/gpr/thd_windows.cc", "src/core/lib/gpr/time.cc", |