From 11eff929e24cae59ed21c2f3a0bde22a6dbe0d91 Mon Sep 17 00:00:00 2001 From: Guantao Liu Date: Mon, 7 Jan 2019 18:22:50 -0800 Subject: Avoid the thread jump in server callback APIs. Add a utility function in iomgr to check whether the caller thread is a worker for any background poller, and keep grpc combiner from offloading closures to the default executor if the current thread is a worker for any background poller. --- src/core/lib/iomgr/combiner.cc | 9 ++++++++- src/core/lib/iomgr/ev_epoll1_linux.cc | 3 +++ src/core/lib/iomgr/ev_epollex_linux.cc | 3 +++ src/core/lib/iomgr/ev_poll_posix.cc | 3 +++ src/core/lib/iomgr/ev_posix.cc | 4 ++++ src/core/lib/iomgr/ev_posix.h | 4 ++++ src/core/lib/iomgr/iomgr.cc | 4 ++++ src/core/lib/iomgr/iomgr.h | 3 +++ src/core/lib/iomgr/iomgr_custom.cc | 6 +++++- src/core/lib/iomgr/iomgr_internal.cc | 4 ++++ src/core/lib/iomgr/iomgr_internal.h | 4 ++++ src/core/lib/iomgr/iomgr_posix.cc | 7 ++++++- src/core/lib/iomgr/iomgr_posix_cfstream.cc | 7 ++++++- src/core/lib/iomgr/iomgr_windows.cc | 7 ++++++- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 1 + 15 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 7c0062eb4e..402f8904ea 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -29,6 +29,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -228,8 +229,14 @@ bool grpc_combiner_continue_exec_ctx() { grpc_core::ExecCtx::Get()->IsReadyToFinish(), lock->time_to_execute_final_list)); + // offload only if all the following conditions are true: + // 1. the combiner is contended and has more than one closure to execute + // 2. the current execution context needs to finish as soon as possible + // 3. the DEFAULT executor is threaded + // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded()) { + grpc_executor_is_threaded() && + !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 4b8c891e9b..9eb4c089d8 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1287,6 +1289,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7a4870db78..0a0891013a 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1644,6 +1646,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 67cbfbbd02..c479206410 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() { * event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1828,6 +1830,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 32d1b6c43e..fb2e70eee4 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -399,6 +399,10 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } +bool grpc_is_any_background_poller_thread(void) { + return g_event_engine->is_any_background_poller_thread(); +} + void grpc_shutdown_background_closure(void) { g_event_engine->shutdown_background_closure(); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 812c7a0f0f..94ac9fdba6 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -80,6 +80,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); + bool (*is_any_background_poller_thread)(void); void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; @@ -181,6 +182,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); +/* Returns true if the caller is a worker thread for any background poller. */ +bool grpc_is_any_background_poller_thread(); + /* Shut down all the closures registered in the background poller. */ void grpc_shutdown_background_closure(); diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index eb29973514..a492146857 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -161,6 +161,10 @@ void grpc_iomgr_shutdown_background_closure() { grpc_iomgr_platform_shutdown_background_closure(); } +bool grpc_iomgr_is_any_background_poller_thread() { + return grpc_iomgr_platform_is_any_background_poller_thread(); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 8ea9289e06..6261aa550c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -39,6 +39,9 @@ void grpc_iomgr_shutdown(); * background poller. */ void grpc_iomgr_shutdown_background_closure(); +/** Returns true if the caller is a worker thread for any background poller. */ +bool grpc_iomgr_is_any_background_poller_thread(); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index 4b112c9097..e1cd8f7310 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -41,10 +41,14 @@ static void iomgr_platform_init(void) { static void iomgr_platform_flush(void) {} static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index b6c9211865..e68b1cf581 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -45,3 +45,7 @@ void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } void grpc_iomgr_platform_shutdown_background_closure() { iomgr_platform_vtable->shutdown_background_closure(); } + +bool grpc_iomgr_platform_is_any_background_poller_thread() { + return iomgr_platform_vtable->is_any_background_poller_thread(); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index bca7409907..2250ad9a18 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -36,6 +36,7 @@ typedef struct grpc_iomgr_platform_vtable { void (*flush)(void); void (*shutdown)(void); void (*shutdown_background_closure)(void); + bool (*is_any_background_poller_thread)(void); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -56,6 +57,9 @@ void grpc_iomgr_platform_shutdown(void); /** shut down all the closures registered in the background poller */ void grpc_iomgr_platform_shutdown_background_closure(void); +/** return true is the caller is a worker thread for any background poller */ +bool grpc_iomgr_platform_is_any_background_poller_thread(void); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 9386adf060..278c8de688 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -55,9 +55,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 552ef4309c..462ac41fcd 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -58,9 +58,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 24ef0dba7b..0579e16aa7 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -73,9 +73,14 @@ static void iomgr_platform_shutdown(void) { static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index dca97c85b1..7aa197b597 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -94,6 +94,7 @@ static const grpc_event_engine_vtable* init_engine_vtable(bool) { g_vtable.pollset_destroy = pollset_destroy; g_vtable.pollset_work = pollset_work; g_vtable.pollset_kick = pollset_kick; + g_vtable.is_any_background_poller_thread = [] { return false; }; g_vtable.shutdown_background_closure = [] {}; g_vtable.shutdown_engine = [] {}; -- cgit v1.2.3