diff options
Diffstat (limited to 'src/core/lib/iomgr/executor.c')
-rw-r--r-- | src/core/lib/iomgr/executor.c | 178 |
1 files changed, 139 insertions, 39 deletions
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index dd5cb2a64e..892385d7d7 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -40,6 +40,7 @@ typedef struct { grpc_closure_list elems; size_t depth; bool shutdown; + bool queued_long_job; gpr_thd_id id; } thread_state; @@ -50,6 +51,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -59,6 +63,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -66,6 +78,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -82,7 +95,8 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); gpr_atm_no_barrier_store(&g_cur_threads, 1); gpr_tls_init(&g_this_thread_state); - g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); + g_thread_state = + (thread_state *)gpr_zalloc(sizeof(thread_state) * g_max_threads); for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); @@ -120,6 +134,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -129,7 +144,7 @@ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { } static void executor_thread(void *arg) { - thread_state *ts = arg; + thread_state *ts = (thread_state *)arg; gpr_tls_set(&g_this_thread_state, (intptr_t)ts); grpc_exec_ctx exec_ctx = @@ -137,12 +152,21 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown", + (int)(ts - g_thread_state)); + } gpr_mu_unlock(&ts->mu); break; } @@ -150,52 +174,128 @@ static void executor_thread(void *arg) { grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); + } subtract_depth = run_closures(&exec_ctx, exec); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx); - if (cur_thread_count == 0) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - return; - } - thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); - if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + grpc_error *error, bool is_short) { + bool retry_push; + if (is_short) { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx); } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - } - gpr_mu_lock(&ts->mu); - if (grpc_closure_list_empty(ts->elems)) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); - gpr_cv_signal(&ts->cv); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx); } - grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - bool try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - gpr_mu_unlock(&ts->mu); - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { - cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + do { + retry_push = false; + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", + closure, closure->file_created, closure->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); +#endif + } + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; } - gpr_spinlock_unlock(&g_adding_thread_lock); - } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } else { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); + } + thread_state *orig_ts = ts; + + bool try_new_thread; + for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log( + GPR_DEBUG, + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, (int)(ts - g_thread_state)); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d", + closure, is_short ? "short" : "long", + (int)(ts - g_thread_state)); +#endif + } + gpr_mu_lock(&ts->mu); + if (ts->queued_long_job) { + // if there's a long job queued, we never queue anything else to this + // queue (since long jobs can take 'infinite' time and we need to + // guarantee no starvation) + // ... spin through queues and try again + gpr_mu_unlock(&ts->mu); + size_t idx = (size_t)(ts - g_thread_state); + ts = &g_thread_state[(idx + 1) % cur_thread_count]; + if (ts == orig_ts) { + retry_push = true; + try_new_thread = true; + break; + } + continue; + } + if (grpc_closure_list_empty(ts->elems)) { + GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (!is_short) ts->queued_long_job = true; + gpr_mu_unlock(&ts->mu); + break; + } + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } + if (retry_push) { + GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx); + } + } while (retry_push); } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} |