From 7b2dd93362099eef75b49fe33b93692bb148b93f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Mar 2017 16:25:12 -0700 Subject: Track milliseconds since process start in timer heap Allows reducing a lock-then-check to an atomic load and check on the fast path of timer checks. Reduces locks per RPC by about 5. --- include/grpc/impl/codegen/atm_gcc_atomic.h | 1 + include/grpc/impl/codegen/atm_gcc_sync.h | 1 + include/grpc/impl/codegen/atm_windows.h | 1 + 3 files changed, 3 insertions(+) (limited to 'include') diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h index 4bd3b25741..c8832419df 100644 --- a/include/grpc/impl/codegen/atm_gcc_atomic.h +++ b/include/grpc/impl/codegen/atm_gcc_atomic.h @@ -39,6 +39,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #ifdef GPR_LOW_LEVEL_COUNTERS extern gpr_atm gpr_counter_atm_cas; diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h index 9aa2b43189..dd81476031 100644 --- a/include/grpc/impl/codegen/atm_gcc_sync.h +++ b/include/grpc/impl/codegen/atm_gcc_sync.h @@ -39,6 +39,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory") diff --git a/include/grpc/impl/codegen/atm_windows.h b/include/grpc/impl/codegen/atm_windows.h index 0ab70b95c4..b8f63da758 100644 --- a/include/grpc/impl/codegen/atm_windows.h +++ b/include/grpc/impl/codegen/atm_windows.h @@ -38,6 +38,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #define gpr_atm_full_barrier MemoryBarrier -- cgit v1.2.3 From 185f6c9e047d67ffa8cc4cc9fc1844456e92edfe Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Mar 2017 08:33:19 -0700 Subject: Fix rounding, reduce contention on global shared state --- include/grpc/impl/codegen/port_platform.h | 2 + include/grpc/support/tls.h | 2 +- src/core/lib/iomgr/ev_epoll_linux.c | 2 + src/core/lib/iomgr/ev_poll_posix.c | 2 + src/core/lib/iomgr/timer.h | 3 + src/core/lib/iomgr/timer_generic.c | 104 +++++++++++++++++++----------- 6 files changed, 75 insertions(+), 40 deletions(-) (limited to 'include') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index e565cd31d7..3d490db1a5 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -367,8 +367,10 @@ typedef unsigned __int64 uint64_t; #ifndef GRPC_MUST_USE_RESULT #if defined(__GNUC__) && !defined(__MINGW32__) #define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result)) +#define GPR_ALIGN_STRUCT(n) __attribute__((aligned(n))) #else #define GRPC_MUST_USE_RESULT +#define GPR_ALIGN_STRUCT(n) #endif #endif diff --git a/include/grpc/support/tls.h b/include/grpc/support/tls.h index a45e1f0a4d..5365449f0d 100644 --- a/include/grpc/support/tls.h +++ b/include/grpc/support/tls.h @@ -58,7 +58,7 @@ gpr_tls_set(&foo, new_value); Accessing a thread local: - current_value = gpr_tls_get(&foo, value); + current_value = gpr_tls_get(&foo); ALL functions here may be implemented as macros. */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 11208b9ad1..400d4057a7 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -56,6 +56,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -1669,6 +1670,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; if (data_ptr == &global_wakeup_fd) { + grpc_timer_consume_kick(); append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else if (data_ptr == &pi->workqueue_wakeup_fd) { diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 5ddd5313e2..f27eb88843 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -52,6 +52,7 @@ #include #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" @@ -1004,6 +1005,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + grpc_timer_consume_kick(); work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd)); } diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index d84a278b18..e0338f93c7 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -101,6 +101,9 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); +/* Consume a kick issued by grpc_kick_poller */ +void grpc_timer_consume_kick(void); + /* the following must be implemented by each iomgr implementation */ void grpc_kick_poller(void); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 090b4dc2d4..900731c37c 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -39,6 +39,7 @@ #include #include +#include #include #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" @@ -67,17 +68,25 @@ typedef struct { grpc_timer list; } shard_type; -/* Protects g_shard_queue */ -static gpr_mu g_mu; -/* Allow only one run_some_expired_timers at once */ -static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER; +struct shared_mutables { + gpr_atm min_timer; + /* Allow only one run_some_expired_timers at once */ + gpr_spinlock checker_mu; + bool initialized; + /* Protects g_shard_queue */ + gpr_mu mu; +} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); + +static struct shared_mutables g_shared_mutables = { + .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, +}; static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_mu */ +/* Protected by g_shared_mutables.mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static bool g_initialized = false; static gpr_timespec g_start_time; -static gpr_atm g_min_timer; + +GPR_TLS_DECL(g_last_seen_min_timer); static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error); @@ -90,8 +99,17 @@ static gpr_timespec dbl_to_ts(double d) { return ts; } -static gpr_atm timespec_to_atm(gpr_timespec ts) { - double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0; +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; if (x > GPR_ATM_MAX) return GPR_ATM_MAX; return (gpr_atm)x; @@ -110,18 +128,19 @@ static gpr_atm compute_min_deadline(shard_type *shard) { void grpc_timer_list_init(gpr_timespec now) { uint32_t i; - g_initialized = true; - gpr_mu_init(&g_mu); + g_shared_mutables.initialized = true; + gpr_mu_init(&g_shared_mutables.mu); g_clock_type = now.clock_type; g_start_time = now; - g_min_timer = timespec_to_atm(now); + g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + gpr_tls_init(&g_last_seen_min_timer); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); - shard->queue_deadline_cap = timespec_to_atm(now); + shard->queue_deadline_cap = g_shared_mutables.min_timer; shard->shard_queue_index = i; grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; @@ -139,8 +158,9 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } - gpr_mu_destroy(&g_mu); - g_initialized = false; + gpr_mu_destroy(&g_shared_mutables.mu); + gpr_tls_destroy(&g_last_seen_min_timer); + g_shared_mutables.initialized = false; } static double ts_to_dbl(gpr_timespec ts) { @@ -191,9 +211,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - timer->deadline = timespec_to_atm(deadline); + timer->deadline = timespec_to_atm_round_up(deadline); - if (!g_initialized) { + if (!g_shared_mutables.initialized) { timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, @@ -233,22 +253,27 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, In that case, the timer will simply have to wait for the next grpc_timer_check. */ if (is_first_timer) { - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_shared_mutables.mu); if (timer->deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; shard->min_deadline = timer->deadline; note_deadline_change(shard); if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { - gpr_atm_no_barrier_store(&g_min_timer, timer->deadline); + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, timer->deadline); grpc_kick_poller(); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_shared_mutables.mu); } } +void grpc_timer_consume_kick(void) { + /* force re-evaluation of last seeen min */ + gpr_tls_set(&g_last_seen_min_timer, 0); +} + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!g_initialized) { + if (!g_shared_mutables.initialized) { /* must have already been cancelled, also the shard mutex is invalid */ return; } @@ -334,12 +359,23 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error) { size_t n = 0; - if (now < gpr_atm_no_barrier_load(&g_min_timer)) { + /* fetch from a thread-local first: this avoids contention on a globally + mutable cacheline in the common case */ + gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { + if (next != NULL) *next = GPR_MIN(*next, min_timer); return 0; } - if (gpr_spinlock_trylock(&g_checker_mu)) { - gpr_mu_lock(&g_mu); + min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); + gpr_tls_set(&g_last_seen_min_timer, min_timer); + if (now < min_timer) { + if (next != NULL) *next = GPR_MIN(*next, min_timer); + return 0; + } + + if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { + gpr_mu_lock(&g_shared_mutables.mu); while (g_shard_queue[0]->min_deadline < now) { gpr_atm new_min_deadline; @@ -363,20 +399,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } - gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline); - gpr_mu_unlock(&g_mu); - gpr_spinlock_unlock(&g_checker_mu); - } else if (next != NULL) { - /* TODO(ctiller): this forces calling code to do an short poll, and - then retry the timer check (because this time through the timer list was - contended). - - We could reduce the cost here dramatically by keeping a count of how many - currently active pollers got through the uncontended case above - successfully, and waking up other pollers IFF that count drops to zero. - - Once that count is in place, this entire else branch could disappear. */ - *next = GPR_MIN(*next, now + 1); + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, + g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_shared_mutables.mu); + gpr_spinlock_unlock(&g_shared_mutables.checker_mu); } GRPC_ERROR_UNREF(error); @@ -387,7 +413,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm(now); + gpr_atm now_atm = timespec_to_atm_round_down(now); gpr_atm next_atm; bool r = run_some_expired_timers( exec_ctx, now_atm, &next_atm, -- cgit v1.2.3 From f2e609b93eb8f2a5e8765ae7cee4338c154d2973 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 16:43:24 -0700 Subject: sketching minimal stack configurator --- include/grpc/impl/codegen/grpc_types.h | 3 +++ src/core/ext/census/grpc_plugin.c | 2 +- src/core/lib/channel/channel_args.c | 7 +++++++ src/core/lib/channel/channel_args.h | 2 ++ src/core/lib/surface/init.c | 18 +++++++++++++++++- 5 files changed, 30 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index aa4210b1a7..5beac83a3b 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -152,6 +152,9 @@ typedef struct { #define GRPC_ARG_ENABLE_CENSUS "grpc.census" /** If non-zero, enable load reporting. */ #define GRPC_ARG_ENABLE_LOAD_REPORTING "grpc.loadreporting" +/** Request that optional features default to off (regardless of what they + usually default to) - to enable tight control over what gets enabled */ +#define GRPC_ARG_MINIMAL_STACK "grpc.minimal_stack" /** Maximum number of concurrent incoming streams to allow on a http2 connection. Int valued. */ #define GRPC_ARG_MAX_CONCURRENT_STREAMS "grpc.max_concurrent_streams" diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c index c9fe453af8..28d266e22a 100644 --- a/src/core/ext/census/grpc_plugin.c +++ b/src/core/ext/census/grpc_plugin.c @@ -48,7 +48,7 @@ static bool is_census_enabled(const grpc_channel_args *a) { return a->args[i].value.integer != 0 && census_enabled(); } } - return census_enabled(); + return census_enabled() && !grpc_channel_args_want_minimal_stack(a); } static bool maybe_add_census_filter(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 1a099ac437..a6d124c719 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -346,3 +346,10 @@ int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { } return arg->value.integer; } + +bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args) { + const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK); + if (arg == NULL) return false; + if (arg->type == GRPC_ARG_INTEGER && arg->value.integer == 0) return false; + return true; +} diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 5c7d31f8bb..158cda5b21 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -113,6 +113,8 @@ grpc_channel_args *grpc_channel_args_set_socket_mutator( const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, const char *name); +bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args); + typedef struct grpc_integer_options { int default_value; // Return this if value is outside of expected bounds. int min_value; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index b46ecac18d..ce88248360 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -107,6 +107,22 @@ static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx, return true; } +typedef struct { + const grpc_channel_filter *filter; + const char *controlling_channel_arg; + bool default_on; +} maybe_prepend_filter_args; + +static const maybe_prepend_filter_args message_size_args = { + &grpc_message_size_filter, NULL, true}; + +static bool maybe_prepend_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, + void *arg) { + return grpc_channel_stack_builder_prepend_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); +} + static void register_builtin_channel_init() { grpc_channel_init_register_stage( GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, @@ -119,7 +135,7 @@ static void register_builtin_channel_init() { (void *)&grpc_max_age_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_message_size_filter); + maybe_prepend_filter, (void *)&message_size_args); grpc_channel_init_register_stage( GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_message_size_filter); -- cgit v1.2.3