diff options
27 files changed, 236 insertions, 230 deletions
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index c453b8f758..6e0eb5dfaa 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -35,10 +35,10 @@ class Thread { /// Default constructor only to allow use in structs that lack constructors /// Does not produce a validly-constructed thread; must later /// use placement new to construct a real thread. Does not init mu_ and cv_ - Thread(): real_(false), alive_(false), started_(false), joined_(false) {} + Thread() : real_(false), alive_(false), started_(false), joined_(false) {} Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success = nullptr); + bool* success = nullptr); ~Thread() { if (!alive_) { // This thread never existed, so nothing to do @@ -63,6 +63,7 @@ class Thread { static void Init(); static bool AwaitAll(gpr_timespec deadline); + private: gpr_mu mu_; gpr_cv ready_; diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc index 1d166bb405..9ed6758b31 100644 --- a/src/core/lib/gprpp/thd_posix.cc +++ b/src/core/lib/gprpp/thd_posix.cc @@ -75,8 +75,8 @@ void dec_thd_count() { } // namespace Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success): - real_(true), alive_(false), started_(false), joined_(false) { + bool* success) + : real_(true), alive_(false), started_(false), joined_(false) { gpr_mu_init(&mu_); gpr_cv_init(&ready_); pthread_attr_t attr; @@ -94,36 +94,42 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); pthread_t p; - alive_ = (pthread_create(&p, &attr, [](void *v) -> void* { - thd_arg a = *static_cast<thd_arg*>(v); - free(v); - if (a.name != nullptr) { + alive_ = (pthread_create(&p, &attr, + [](void* v) -> void* { + thd_arg a = *static_cast<thd_arg*>(v); + free(v); + if (a.name != nullptr) { #if GPR_APPLE_PTHREAD_NAME - /* Apple supports 64 characters, and will truncate if it's longer. */ - pthread_setname_np(a.name); + /* Apple supports 64 characters, and will + * truncate if it's longer. */ + pthread_setname_np(a.name); #elif GPR_LINUX_PTHREAD_NAME - /* Linux supports 16 characters max, and will error if it's longer. */ - char buf[16]; - size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; - strncpy(buf, a.name, buf_len); - buf[buf_len] = '\0'; - pthread_setname_np(pthread_self(), buf); + /* Linux supports 16 characters max, and will + * error if it's longer. */ + char buf[16]; + size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; + strncpy(buf, a.name, buf_len); + buf[buf_len] = '\0'; + pthread_setname_np(pthread_self(), buf); #endif // GPR_APPLE_PTHREAD_NAME - } - - gpr_mu_lock(&a.thread->mu_); - if (!a.thread->started_) { - gpr_cv_wait(&a.thread->ready_, &a.thread->mu_, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - } - gpr_mu_unlock(&a.thread->mu_); - - (*a.body)(a.arg); - dec_thd_count(); - return nullptr; - }, a) == 0); - - if (success != nullptr) { *success = alive_; } + } + + gpr_mu_lock(&a.thread->mu_); + if (!a.thread->started_) { + gpr_cv_wait(&a.thread->ready_, &a.thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&a.thread->mu_); + + (*a.body)(a.arg); + dec_thd_count(); + return nullptr; + }, + a) == 0); + + if (success != nullptr) { + *success = alive_; + } id_ = reinterpret_cast<gpr_thd_id>(p); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 4553845309..446b84d4b1 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -22,7 +22,6 @@ #include "src/core/lib/iomgr/ev_poll_posix.h" -#include <new> #include <assert.h> #include <errno.h> #include <limits.h> @@ -30,6 +29,7 @@ #include <string.h> #include <sys/socket.h> #include <unistd.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -37,9 +37,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/murmur_hash.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index ad1b306ffa..b526c14af4 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -18,8 +18,8 @@ #include "src/core/lib/iomgr/executor.h" -#include <new> #include <string.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/cpu.h> @@ -28,9 +28,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/spinlock.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 @@ -104,9 +104,8 @@ void grpc_executor_set_threading(bool threading) { g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - new (&g_thread_state[0].thd) grpc_core::Thread("grpc_executor", - executor_thread, - &g_thread_state[0]); + new (&g_thread_state[0].thd) + grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); g_thread_state[0].thd.Start(); } else { if (cur_threads == 0) return; @@ -265,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error, if (cur_thread_count < g_max_threads) { gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - new (&g_thread_state[cur_thread_count].thd) - grpc_core::Thread("grpc_executor", executor_thread, - &g_thread_state[cur_thread_count]); - g_thread_state[cur_thread_count].thd.Start(); + new (&g_thread_state[cur_thread_count].thd) + grpc_core::Thread("grpc_executor", executor_thread, + &g_thread_state[cur_thread_count]); + g_thread_state[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&g_adding_thread_lock); } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 072670f8be..3c2b83a549 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -31,8 +31,8 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 6509ec1de9..e3d651ec82 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -33,8 +33,8 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index f4c289af6a..d79f5f814d 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -18,8 +18,8 @@ #include "src/core/lib/iomgr/timer_manager.h" -#include <new> #include <inttypes.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index fd78d5ee03..a8e175bc34 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -30,8 +30,8 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #define MAX_TABLE_RESIZE 256 diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index 13ae1d8b7b..97646d1000 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -202,8 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) { } static void init_output() { - g_writing_thread = new grpc_core::Thread("timer_output_thread", - writing_thread, nullptr); + g_writing_thread = + new grpc_core::Thread("timer_output_thread", writing_thread, nullptr); atexit(finish_writing); } diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 0dbde3d788..3d133cfc18 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -254,16 +254,15 @@ static void actually_poll_server(void* arg) { gpr_free(pa); } -static grpc_core::Thread* - poll_server_until_read_done(test_tcp_server* server, - gpr_event* signal_when_done) { +static grpc_core::Thread* poll_server_until_read_done( + test_tcp_server* server, gpr_event* signal_when_done) { gpr_atm_rel_store(&state.done_atm, 0); state.write_done = 0; poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa))); pa->server = server; pa->signal_when_done = signal_when_done; auto* th = grpc_core::New<grpc_core::Thread>("grpc_poll_server", - actually_poll_server, pa); + actually_poll_server, pa); th->Start(); return th; } @@ -285,8 +284,8 @@ static void run_test(const char* response_payload, state.response_payload_length = response_payload_length; /* poll server until sending out the response */ - grpc_core::UniquePtr<grpc_core::Thread> - thdptr(poll_server_until_read_done(&test_server, &ev)); + grpc_core::UniquePtr<grpc_core::Thread> thdptr( + poll_server_until_read_done(&test_server, &ev)); start_rpc(server_port, expected_status, expected_detail); gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); thdptr->Join(); diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index d95ce66423..d074fdaa52 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -20,8 +20,8 @@ #include "src/core/lib/iomgr/sockaddr.h" -#include <new> #include <string.h> +#include <new> #include <grpc/grpc.h> #include <grpc/slice_buffer.h> diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc index fe54261b5e..5b4f3b9236 100644 --- a/test/core/end2end/fixtures/proxy.cc +++ b/test/core/end2end/fixtures/proxy.cc @@ -18,16 +18,16 @@ #include "test/core/end2end/fixtures/proxy.h" -#include <new> #include <string.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include "src/core/lib/gpr/host_port.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "test/core/util/port.h" struct grpc_end2end_proxy { diff --git a/test/core/gpr/arena_test.cc b/test/core/gpr/arena_test.cc index 717052eacd..b00c014cea 100644 --- a/test/core/gpr/arena_test.cc +++ b/test/core/gpr/arena_test.cc @@ -18,9 +18,9 @@ #include "src/core/lib/gpr/arena.h" -#include <new> #include <inttypes.h> #include <string.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -28,8 +28,8 @@ #include <grpc/support/sync.h> #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" static void test_noop(void) { gpr_arena_destroy(gpr_arena_create(1)); } @@ -102,8 +102,8 @@ static void concurrent_test(void) { grpc_core::Thread thds[CONCURRENT_TEST_THREADS]; for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { - new (&thds[i]) grpc_core::Thread("grpc_concurrent_test", - concurrent_test_body, &args); + new (&thds[i]) + grpc_core::Thread("grpc_concurrent_test", concurrent_test_body, &args); thds[i].Start(); } diff --git a/test/core/gpr/cpu_test.cc b/test/core/gpr/cpu_test.cc index 4575fb643a..279e6e6f5a 100644 --- a/test/core/gpr/cpu_test.cc +++ b/test/core/gpr/cpu_test.cc @@ -23,9 +23,9 @@ #include <grpc/support/cpu.h> -#include <new> #include <stdio.h> #include <string.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -115,7 +115,7 @@ static void cpu_test(void) { uint32_t nthreads = ct.ncores * 3; grpc_core::Thread* thd = - static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd)*nthreads)); + static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd) * nthreads)); for (i = 0; i < nthreads; i++) { new (&thd[i]) grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct); diff --git a/test/core/gpr/mpscq_test.cc b/test/core/gpr/mpscq_test.cc index bf65b2d0b9..33f93878e0 100644 --- a/test/core/gpr/mpscq_test.cc +++ b/test/core/gpr/mpscq_test.cc @@ -18,15 +18,15 @@ #include "src/core/lib/gpr/mpscq.h" -#include <new> #include <stdlib.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" typedef struct test_node { @@ -167,12 +167,12 @@ static void test_mt_multipop(void) { pa.start = &start; gpr_mu_init(&pa.mu); for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { - new (&pull_thds[i]) grpc_core::Thread("grpc_multipop_pull", - pull_thread, &pa); + new (&pull_thds[i]) + grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa); pull_thds[i].Start(); } gpr_event_set(&start, (void*)1); - for (auto& pth: pull_thds) { + for (auto& pth : pull_thds) { pth.Join(); } gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins); diff --git a/test/core/gpr/spinlock_test.cc b/test/core/gpr/spinlock_test.cc index 1a3475b8f2..ac9f70f301 100644 --- a/test/core/gpr/spinlock_test.cc +++ b/test/core/gpr/spinlock_test.cc @@ -20,9 +20,9 @@ #include "src/core/lib/gpr/spinlock.h" -#include <new> #include <stdio.h> #include <stdlib.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/test/core/gpr/sync_test.cc b/test/core/gpr/sync_test.cc index 2cdf061c71..487f394b14 100644 --- a/test/core/gpr/sync_test.cc +++ b/test/core/gpr/sync_test.cc @@ -20,9 +20,9 @@ #include <grpc/support/sync.h> -#include <new> #include <stdio.h> #include <stdlib.h> +#include <new> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -164,8 +164,8 @@ struct test { static struct test* test_new(int nthreads, int64_t iterations, int incr_step) { struct test* m = static_cast<struct test*>(gpr_malloc(sizeof(*m))); m->nthreads = nthreads; - m->threads = - static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*m->threads) * nthreads)); + m->threads = static_cast<grpc_core::Thread*>( + gpr_malloc(sizeof(*m->threads) * nthreads)); m->iterations = iterations; m->counter = 0; m->thread_count = 0; diff --git a/test/core/gpr/tls_test.cc b/test/core/gpr/tls_test.cc index 15f329f21a..a060cd47f1 100644 --- a/test/core/gpr/tls_test.cc +++ b/test/core/gpr/tls_test.cc @@ -20,9 +20,9 @@ #include "src/core/lib/gpr/tls.h" -#include <new> #include <stdio.h> #include <stdlib.h> +#include <new> #include <grpc/support/log.h> #include <grpc/support/sync.h> diff --git a/test/core/gprpp/thd_test.cc b/test/core/gprpp/thd_test.cc index d98f617288..1aa8632757 100644 --- a/test/core/gprpp/thd_test.cc +++ b/test/core/gprpp/thd_test.cc @@ -20,9 +20,9 @@ #include "src/core/lib/gprpp/thd.h" -#include <new> #include <stdio.h> #include <stdlib.h> +#include <new> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -59,7 +59,7 @@ static void test1(void) { gpr_cv_init(&t.done_cv); t.n = NUM_THREADS; t.is_done = 0; - for (auto& th: thds) { + for (auto& th : thds) { new (&th) grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t); th.Start(); } @@ -68,7 +68,7 @@ static void test1(void) { gpr_cv_wait(&t.done_cv, &t.mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&t.mu); - for (auto& th: thds) { + for (auto& th : thds) { th.Join(); } GPR_ASSERT(t.n == 0); @@ -79,14 +79,14 @@ static void thd_body2(void* v) {} /* Test that we can create a number of threads and join them. */ static void test2(void) { grpc_core::Thread thds[NUM_THREADS]; - for (auto& th: thds) { + for (auto& th : thds) { bool ok; - new (&th) grpc_core::Thread("grpc_thread_body2_test", &thd_body2, - nullptr, &ok); + new (&th) + grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok); GPR_ASSERT(ok); th.Start(); } - for (auto& th: thds) { + for (auto& th : thds) { th.Join(); } } diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index 45afe53564..e4f4783e58 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -24,8 +24,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" static void test_no_op(void) { @@ -105,8 +105,8 @@ static void test_execute_many(void) { ta[i].ctr = 0; ta[i].lock = lock; gpr_event_init(&ta[i].done); - new (&thds[i]) grpc_core::Thread("grpc_execute_many", - execute_many_loop, &ta[i]); + new (&thds[i]) + grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]); thds[i].Start(); } for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index 751b80fa89..b5f1d4ef68 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -30,8 +30,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc index 9d61c9818b..895ede24c7 100644 --- a/test/core/iomgr/resolve_address_posix_test.cc +++ b/test/core/iomgr/resolve_address_posix_test.cc @@ -18,9 +18,9 @@ #include "src/core/lib/iomgr/resolve_address.h" -#include <new> #include <string.h> #include <sys/un.h> +#include <new> #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -28,8 +28,8 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" diff --git a/test/core/iomgr/wakeup_fd_cv_test.cc b/test/core/iomgr/wakeup_fd_cv_test.cc index 1b1c9ef811..9bd7c6e47e 100644 --- a/test/core/iomgr/wakeup_fd_cv_test.cc +++ b/test/core/iomgr/wakeup_fd_cv_test.cc @@ -134,89 +134,89 @@ void test_poll_cv_trigger(void) { pargs.result = -2; { - grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); - thd.Start(); - // Wakeup wakeup_fd not listening for events - GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE); - thd.Join(); - GPR_ASSERT(pargs.result == 0); - GPR_ASSERT(pfds[0].revents == 0); - GPR_ASSERT(pfds[1].revents == 0); - GPR_ASSERT(pfds[2].revents == 0); - GPR_ASSERT(pfds[3].revents == 0); - GPR_ASSERT(pfds[4].revents == 0); - GPR_ASSERT(pfds[5].revents == 0); + grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); + thd.Start(); + // Wakeup wakeup_fd not listening for events + GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE); + thd.Join(); + GPR_ASSERT(pargs.result == 0); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); } { - // Pollin on socket fd - pargs.timeout = -1; - pargs.result = -2; - grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); - thd.Start(); - trigger_socket_event(); - thd.Join(); - GPR_ASSERT(pargs.result == 1); - GPR_ASSERT(pfds[0].revents == 0); - GPR_ASSERT(pfds[1].revents == 0); - GPR_ASSERT(pfds[2].revents == POLLIN); - GPR_ASSERT(pfds[3].revents == 0); - GPR_ASSERT(pfds[4].revents == 0); - GPR_ASSERT(pfds[5].revents == 0); + // Pollin on socket fd + pargs.timeout = -1; + pargs.result = -2; + grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); + thd.Start(); + trigger_socket_event(); + thd.Join(); + GPR_ASSERT(pargs.result == 1); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == POLLIN); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); } { - // Pollin on wakeup fd - reset_socket_event(); - pargs.result = -2; - grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); - thd.Start(); - GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE); - thd.Join(); - - GPR_ASSERT(pargs.result == 1); - GPR_ASSERT(pfds[0].revents == 0); - GPR_ASSERT(pfds[1].revents == POLLIN); - GPR_ASSERT(pfds[2].revents == 0); - GPR_ASSERT(pfds[3].revents == 0); - GPR_ASSERT(pfds[4].revents == 0); - GPR_ASSERT(pfds[5].revents == 0); + // Pollin on wakeup fd + reset_socket_event(); + pargs.result = -2; + grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); + thd.Start(); + GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE); + thd.Join(); + + GPR_ASSERT(pargs.result == 1); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == POLLIN); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); } { - // Pollin on wakeupfd before poll() - pargs.result = -2; - grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); - thd.Start(); - thd.Join(); - - GPR_ASSERT(pargs.result == 1); - GPR_ASSERT(pfds[0].revents == 0); - GPR_ASSERT(pfds[1].revents == POLLIN); - GPR_ASSERT(pfds[2].revents == 0); - GPR_ASSERT(pfds[3].revents == 0); - GPR_ASSERT(pfds[4].revents == 0); - GPR_ASSERT(pfds[5].revents == 0); + // Pollin on wakeupfd before poll() + pargs.result = -2; + grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); + thd.Start(); + thd.Join(); + + GPR_ASSERT(pargs.result == 1); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == POLLIN); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); } { - // No Events - pargs.result = -2; - pargs.timeout = 1000; - reset_socket_event(); - GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE); - grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); - thd.Start(); - thd.Join(); - - GPR_ASSERT(pargs.result == 0); - GPR_ASSERT(pfds[0].revents == 0); - GPR_ASSERT(pfds[1].revents == 0); - GPR_ASSERT(pfds[2].revents == 0); - GPR_ASSERT(pfds[3].revents == 0); - GPR_ASSERT(pfds[4].revents == 0); - GPR_ASSERT(pfds[5].revents == 0); + // No Events + pargs.result = -2; + pargs.timeout = 1000; + reset_socket_event(); + GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE); + grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs); + thd.Start(); + thd.Join(); + + GPR_ASSERT(pargs.result == 0); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); } } diff --git a/test/core/network_benchmarks/low_level_ping_pong.cc b/test/core/network_benchmarks/low_level_ping_pong.cc index afcd2da57a..df2d9b33aa 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.cc +++ b/test/core/network_benchmarks/low_level_ping_pong.cc @@ -38,8 +38,8 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "test/core/util/cmdline.h" diff --git a/test/core/surface/completion_queue_threading_test.cc b/test/core/surface/completion_queue_threading_test.cc index 391cbf39fa..1a76d7e6ae 100644 --- a/test/core/surface/completion_queue_threading_test.cc +++ b/test/core/surface/completion_queue_threading_test.cc @@ -24,8 +24,8 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" @@ -96,8 +96,8 @@ static void test_too_many_plucks(void) { } thread_states[i].cc = cc; thread_states[i].tag = tags[i]; - new (&threads[i]) grpc_core::Thread("grpc_pluck_test", pluck_one, - thread_states + i); + new (&threads[i]) + grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i); threads[i].Start(); } @@ -221,8 +221,7 @@ static void test_threading(size_t producers, size_t consumers) { "test_threading", producers, consumers); /* start all threads: they will wait for phase1 */ - grpc_core::Thread* threads = - reinterpret_cast<grpc_core::Thread*>( + grpc_core::Thread* threads = reinterpret_cast<grpc_core::Thread*>( gpr_malloc(sizeof(*threads) * (producers + consumers))); for (i = 0; i < producers + consumers; i++) { gpr_event_init(&options[i].on_started); @@ -236,9 +235,8 @@ static void test_threading(size_t producers, size_t consumers) { bool ok; new (&threads[i]) grpc_core::Thread( - i < producers ? "grpc_producer" : "grpc_consumer", - i < producers ? producer_thread : consumer_thread, - options + i, &ok); + i < producers ? "grpc_producer" : "grpc_consumer", + i < producers ? producer_thread : consumer_thread, options + i, &ok); GPR_ASSERT(ok); threads[i].Start(); gpr_event_wait(&options[i].on_started, ten_seconds_time()); diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 9f27bacce6..32b4ae1da8 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -22,9 +22,9 @@ headers. Therefore, sockaddr.h must always be included first */ #include "src/core/lib/iomgr/sockaddr.h" -#include <new> #include <memory.h> #include <stdio.h> +#include <new> #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -175,75 +175,78 @@ int run_concurrent_connectivity_test() { /* First round, no server */ { - gpr_log(GPR_DEBUG, "Wave 1"); - char* localhost = gpr_strdup("localhost:54321"); - grpc_core::Thread threads[NUM_THREADS]; - for (auto& th : threads) { - new (&th) grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); - th.Start(); - } - for (auto& th : threads) { - th.Join(); - } - gpr_free(localhost); + gpr_log(GPR_DEBUG, "Wave 1"); + char* localhost = gpr_strdup("localhost:54321"); + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + new (&th) + grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } + gpr_free(localhost); } { - /* Second round, actual grpc server */ - gpr_log(GPR_DEBUG, "Wave 2"); - int port = grpc_pick_unused_port_or_die(); - gpr_asprintf(&args.addr, "localhost:%d", port); - args.server = grpc_server_create(nullptr, nullptr); - grpc_server_add_insecure_http2_port(args.server, args.addr); - args.cq = grpc_completion_queue_create_for_next(nullptr); - grpc_server_register_completion_queue(args.server, args.cq, nullptr); - grpc_server_start(args.server); - grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args); - server2.Start(); - - grpc_core::Thread threads[NUM_THREADS]; - for (auto& th : threads) { - new (&th) grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); - th.Start(); - } - for (auto& th : threads) { - th.Join(); - } - grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); + /* Second round, actual grpc server */ + gpr_log(GPR_DEBUG, "Wave 2"); + int port = grpc_pick_unused_port_or_die(); + gpr_asprintf(&args.addr, "localhost:%d", port); + args.server = grpc_server_create(nullptr, nullptr); + grpc_server_add_insecure_http2_port(args.server, args.addr); + args.cq = grpc_completion_queue_create_for_next(nullptr); + grpc_server_register_completion_queue(args.server, args.cq, nullptr); + grpc_server_start(args.server); + grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args); + server2.Start(); + + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + new (&th) + grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } + grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); - server2.Join(); - grpc_server_destroy(args.server); - grpc_completion_queue_destroy(args.cq); - gpr_free(args.addr); + server2.Join(); + grpc_server_destroy(args.server); + grpc_completion_queue_destroy(args.cq); + gpr_free(args.addr); } { - /* Third round, bogus tcp server */ - gpr_log(GPR_DEBUG, "Wave 3"); - args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(args.pollset, &args.mu); - gpr_event_init(&args.ready); - grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args); - server3.Start(); - gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - - grpc_core::Thread threads[NUM_THREADS]; - for (auto& th : threads) { - new (&th) grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); - th.Start(); - } - for (auto& th : threads) { - th.Join(); - } + /* Third round, bogus tcp server */ + gpr_log(GPR_DEBUG, "Wave 3"); + args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(args.pollset, &args.mu); + gpr_event_init(&args.ready); + grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args); + server3.Start(); + gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + new (&th) + grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } - gpr_atm_rel_store(&args.stop, 1); - server3.Join(); - { - grpc_core::ExecCtx exec_ctx; - grpc_pollset_shutdown( - args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset, - grpc_schedule_on_exec_ctx)); - } + gpr_atm_rel_store(&args.stop, 1); + server3.Join(); + { + grpc_core::ExecCtx exec_ctx; + grpc_pollset_shutdown( + args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset, + grpc_schedule_on_exec_ctx)); + } } grpc_shutdown(); @@ -289,7 +292,7 @@ int run_concurrent_watches_with_short_timeouts_test() { for (auto& th : threads) { new (&th) grpc_core::Thread("grpc_short_watches", - watches_with_short_timeouts, localhost); + watches_with_short_timeouts, localhost); th.Start(); } for (auto& th : threads) { diff --git a/test/cpp/client/client_channel_stress_test.cc b/test/cpp/client/client_channel_stress_test.cc index 09aa03de08..ee6958dfcf 100644 --- a/test/cpp/client/client_channel_stress_test.cc +++ b/test/cpp/client/client_channel_stress_test.cc @@ -35,8 +35,8 @@ #include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/sockaddr.h" #include "test/core/util/port.h" |