diff options
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 81 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.h | 4 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 6 | ||||
-rw-r--r-- | test/core/end2end/fuzzers/api_fuzzer.c | 6 | ||||
-rw-r--r-- | test/core/end2end/fuzzers/client_fuzzer.c | 2 | ||||
-rw-r--r-- | test/core/end2end/fuzzers/server_fuzzer.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/ev_epollsig_linux_test.c | 12 | ||||
-rw-r--r-- | test/core/iomgr/fd_conservation_posix_test.c | 15 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 4 | ||||
-rw-r--r-- | test/core/iomgr/pollset_set_test.c | 4 | ||||
-rw-r--r-- | test/core/iomgr/resolve_address_posix_test.c | 15 | ||||
-rw-r--r-- | test/core/iomgr/resolve_address_test.c | 15 |
15 files changed, 105 insertions, 81 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index aa7a8c1c70..38eace12c7 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -214,7 +214,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { lock, grpc_exec_ctx_ready_to_finish(exec_ctx), lock->time_to_execute_final_list)); - if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { + if (grpc_exec_ctx_ready_to_finish(exec_ctx) && grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 2a2544dc1f..513248ca57 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -66,22 +66,6 @@ GPR_TLS_DECL(g_this_thread_state); static void executor_thread(void *arg); -void grpc_executor_init() { - g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); - gpr_atm_no_barrier_store(&g_cur_threads, 1); - gpr_tls_init(&g_this_thread_state); - g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_init(&g_thread_state[i].mu); - gpr_cv_init(&g_thread_state[i].cv); - g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; - } - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], &opt); -} - static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { size_t n = 0; @@ -100,24 +84,57 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { return n; } -void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_lock(&g_thread_state[i].mu); - g_thread_state[i].shutdown = true; - gpr_cv_signal(&g_thread_state[i].cv); - gpr_mu_unlock(&g_thread_state[i].mu); - } - for (gpr_atm i = 0; i < g_cur_threads; i++) { - gpr_thd_join(g_thread_state[i].id); +bool grpc_executor_is_threaded() { + return gpr_atm_no_barrier_load(&g_cur_threads) > 0; +} + +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { + gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); + if (threading) { + if (cur_threads > 0) return; + g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); + gpr_atm_no_barrier_store(&g_cur_threads, 1); + gpr_tls_init(&g_this_thread_state); + g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_init(&g_thread_state[i].mu); + gpr_cv_init(&g_thread_state[i].cv); + g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + } + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], + &opt); + } else { + if (cur_threads == 0) return; + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_lock(&g_thread_state[i].mu); + g_thread_state[i].shutdown = true; + gpr_cv_signal(&g_thread_state[i].cv); + gpr_mu_unlock(&g_thread_state[i].mu); + } + for (gpr_atm i = 0; i < g_cur_threads; i++) { + gpr_thd_join(g_thread_state[i].id); + } + gpr_atm_no_barrier_store(&g_cur_threads, 0); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_destroy(&g_thread_state[i].mu); + gpr_cv_destroy(&g_thread_state[i].cv); + run_closures(exec_ctx, g_thread_state[i].elems); + } + gpr_free(g_thread_state); + gpr_tls_destroy(&g_this_thread_state); } +} + +void grpc_executor_init(grpc_exec_ctx *exec_ctx) { gpr_atm_no_barrier_store(&g_cur_threads, 0); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_destroy(&g_thread_state[i].mu); - gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(exec_ctx, g_thread_state[i].elems); - } - gpr_free(g_thread_state); - gpr_tls_destroy(&g_this_thread_state); + grpc_executor_set_threading(exec_ctx, true); +} + +void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { + grpc_executor_set_threading(exec_ctx, false); } static void executor_thread(void *arg) { diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 1213016383..792d5056cb 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -41,11 +41,18 @@ * This mechanism is meant to outsource work (grpc_closure instances) to a * thread, for those cases where blocking isn't an option but there isn't a * non-blocking solution available. */ -void grpc_executor_init(); +void grpc_executor_init(grpc_exec_ctx *exec_ctx); extern grpc_closure_scheduler *grpc_executor_scheduler; /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); +/** Is the executor multi-threaded? */ +bool grpc_executor_is_threaded(); + +/* enable/disable threading - must be called after grpc_executor_init and before + grpc_executor_shutdown */ +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); + #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 9e0e4dbfe0..0623acc597 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -57,12 +57,12 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; -void grpc_iomgr_init(void) { +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); - grpc_executor_init(); + grpc_executor_init(exec_ctx); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -70,7 +70,7 @@ void grpc_iomgr_init(void) { grpc_iomgr_platform_init(); } -void grpc_iomgr_start(void) { grpc_timer_manager_init(); } +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); } static size_t count_objects(void) { grpc_iomgr_object *obj; @@ -95,6 +95,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); + grpc_executor_shutdown(exec_ctx); gpr_mu_lock(&g_mu); g_shutdown = 1; @@ -145,8 +146,6 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { grpc_timer_list_shutdown(exec_ctx); grpc_exec_ctx_flush(exec_ctx); - grpc_executor_shutdown(exec_ctx); - grpc_exec_ctx_flush(exec_ctx); /* ensure all threads have left g_mu */ gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 6e2e023615..bd6ca4a0b8 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -38,10 +38,10 @@ #include "src/core/lib/iomgr/port.h" /** Initializes the iomgr. */ -void grpc_iomgr_init(void); +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx); /** Starts any background threads for iomgr. */ -void grpc_iomgr_start(void); +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx); /** Signals the intention to shutdown the iomgr. Expects to be able to flush * exec_ctx. */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 452e6c444b..86ce4bde61 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -128,6 +128,7 @@ void grpc_init(void) { int i; gpr_once_init(&g_basic_init, do_basic_init); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); @@ -154,7 +155,7 @@ void grpc_init(void) { grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif grpc_security_pre_init(); - grpc_iomgr_init(); + grpc_iomgr_init(&exec_ctx); gpr_timers_global_init(); grpc_handshaker_factory_registry_init(); grpc_security_init(); @@ -170,9 +171,10 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); /* no more changes to channel init pipelines */ grpc_channel_init_finalize(); - grpc_iomgr_start(); + grpc_iomgr_start(&exec_ctx); } gpr_mu_unlock(&g_init_mu); + grpc_exec_ctx_finish(&exec_ctx); GRPC_API_TRACE("grpc_init(void)", 0, ()); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index b33b43dac5..d8d34fba9c 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -41,6 +41,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/timer.h" @@ -724,6 +725,11 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { gpr_now_impl = now_impl; grpc_init(); grpc_timer_manager_set_threading(false); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); + } grpc_resolve_address = my_resolve_address; GPR_ASSERT(g_channel == NULL); diff --git a/test/core/end2end/fuzzers/client_fuzzer.c b/test/core/end2end/fuzzers/client_fuzzer.c index 6f49baffd2..2307a3c771 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.c +++ b/test/core/end2end/fuzzers/client_fuzzer.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" #include "test/core/util/memory_counters.h" @@ -58,6 +59,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { if (leak_check) grpc_memory_counters_init(); grpc_init(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); grpc_resource_quota *resource_quota = grpc_resource_quota_create("client_fuzzer"); diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c index 6d65fe1847..e6f6be2325 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.c +++ b/test/core/end2end/fuzzers/server_fuzzer.c @@ -34,6 +34,7 @@ #include <grpc/grpc.h> #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/server.h" #include "test/core/util/memory_counters.h" @@ -56,6 +57,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { if (leak_check) grpc_memory_counters_init(); grpc_init(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); grpc_resource_quota *resource_quota = grpc_resource_quota_create("server_fuzzer"); diff --git a/test/core/iomgr/ev_epollsig_linux_test.c b/test/core/iomgr/ev_epollsig_linux_test.c index a20c4f2b98..952e774670 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.c +++ b/test/core/iomgr/ev_epollsig_linux_test.c @@ -321,8 +321,9 @@ static void test_threading(void) { int main(int argc, char **argv) { const char *poll_strategy = NULL; grpc_test_init(argc, argv); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); poll_strategy = grpc_get_poll_strategy_name(); if (poll_strategy != NULL && strcmp(poll_strategy, "epollsig") == 0) { @@ -335,11 +336,8 @@ int main(int argc, char **argv) { poll_strategy); } - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_iomgr_shutdown(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); - } + grpc_iomgr_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); return 0; } #else /* defined(GRPC_LINUX_EPOLL) */ diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index f662070655..18d8cf4ec8 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -45,8 +45,9 @@ int main(int argc, char **argv) { grpc_endpoint_pair p; grpc_test_init(argc, argv); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); /* set max # of file descriptors to a low value, and verify we can create and destroy many more than this number @@ -57,19 +58,15 @@ int main(int argc, char **argv) { grpc_resource_quota_create("fd_conservation_posix_test"); for (i = 0; i < 100; i++) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; p = grpc_iomgr_create_endpoint_pair("test", NULL); grpc_endpoint_destroy(&exec_ctx, p.client); grpc_endpoint_destroy(&exec_ctx, p.server); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); } grpc_resource_quota_unref(resource_quota); - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_iomgr_shutdown(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); - } + grpc_iomgr_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); return 0; } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 9e8fe8bffa..d0f31e087d 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -542,8 +542,8 @@ int main(int argc, char **argv) { grpc_closure destroyed; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); g_pollset = gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); test_grpc_fd(); diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c index 092711381d..587130704d 100644 --- a/test/core/iomgr/pollset_set_test.c +++ b/test/core/iomgr/pollset_set_test.c @@ -447,8 +447,8 @@ int main(int argc, char **argv) { const char *poll_strategy = grpc_get_poll_strategy_name(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); if (poll_strategy != NULL && (strcmp(poll_strategy, "epoll") == 0 || diff --git a/test/core/iomgr/resolve_address_posix_test.c b/test/core/iomgr/resolve_address_posix_test.c index bee7036ec8..f07bd045b6 100644 --- a/test/core/iomgr/resolve_address_posix_test.c +++ b/test/core/iomgr/resolve_address_posix_test.c @@ -174,16 +174,13 @@ static void test_unix_socket_path_name_too_long(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_executor_init(); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); test_unix_socket(); test_unix_socket_path_name_too_long(); - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_executor_shutdown(&exec_ctx); - grpc_iomgr_shutdown(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); - } + grpc_executor_shutdown(&exec_ctx); + grpc_iomgr_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); return 0; } diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 83f73070dc..2c3240aaac 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -263,9 +263,9 @@ static void test_unparseable_hostports(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_executor_init(); - grpc_iomgr_init(); - grpc_iomgr_start(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_init(&exec_ctx); + grpc_iomgr_start(&exec_ctx); test_localhost(); test_default_port(); test_non_numeric_default_port(); @@ -274,11 +274,8 @@ int main(int argc, char **argv) { test_ipv6_without_port(); test_invalid_ip_addresses(); test_unparseable_hostports(); - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_executor_shutdown(&exec_ctx); - grpc_iomgr_shutdown(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); - } + grpc_executor_shutdown(&exec_ctx); + grpc_iomgr_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); return 0; } |