diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/debug/stats_data.c | 62 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.h | 26 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.yaml | 8 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data_bq_schema.sql | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 69 |
5 files changed, 43 insertions, 124 deletions
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index fb6055f795..c0aec63c1d 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -109,8 +109,6 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_wakeup_initiated", "executor_queue_drained", "executor_push_retries", - "executor_threads_created", - "executor_threads_used", "server_requested_calls", "server_slowpath_requests_queued", }; @@ -219,8 +217,6 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of times an executor queue was drained", "Number of times we raced and were forced to retry pushing a closure to " "the executor", - "Size of the backing thread pool for overflow gRPC Core work", - "How many executor threads actually got used", "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", @@ -238,7 +234,6 @@ const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "http2_send_message_per_write", "http2_send_trailing_metadata_per_write", "http2_send_flowctl_per_write", - "executor_closures_per_wakeup", "server_cqs_checked", }; const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { @@ -254,7 +249,6 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { "Number of streams whose payload was written per TCP write", "Number of streams terminated per TCP write", "Number of flow control updates written per TCP write", - "Number of closures executed each time an executor wakes up", "How many completion queues were checked looking for a CQ that had " "requested the incoming call", }; @@ -326,7 +320,6 @@ const uint8_t grpc_stats_table_7[102] = { const int grpc_stats_table_8[9] = {0, 1, 2, 4, 7, 13, 23, 39, 64}; const uint8_t grpc_stats_table_9[9] = {0, 0, 1, 2, 2, 3, 4, 4, 5}; void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 262144); if (value < 6) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE, @@ -352,7 +345,6 @@ void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_0, 64)); } void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 29) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -379,7 +371,6 @@ void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_2, 128)); } void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, @@ -405,7 +396,6 @@ void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_4, 64)); } void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -431,7 +421,6 @@ void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_6, 64)); } void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, @@ -457,7 +446,6 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_4, 64)); } void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, @@ -484,7 +472,6 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { } void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -512,7 +499,6 @@ void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM( @@ -540,7 +526,6 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_initial_metadata_per_write( grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -570,7 +555,6 @@ void grpc_stats_inc_http2_send_initial_metadata_per_write( } void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -598,7 +582,6 @@ void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_trailing_metadata_per_write( grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -628,7 +611,6 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write( } void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -654,36 +636,7 @@ void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_6, 64)); } -void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, - int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ - value = GPR_CLAMP(value, 0, 1024); - if (value < 13) { - GRPC_STATS_INC_HISTOGRAM( - (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, value); - return; - } - union { - double dbl; - uint64_t uint; - } _val, _bkt; - _val.dbl = value; - if (_val.uint < 4637863191261478912ull) { - int bucket = - grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13; - _bkt.dbl = grpc_stats_table_6[bucket]; - bucket -= (_val.uint < _bkt.uint); - GRPC_STATS_INC_HISTOGRAM( - (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, bucket); - return; - } - GRPC_STATS_INC_HISTOGRAM((exec_ctx), - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, - grpc_stats_histo_find_bucket_slow( - (exec_ctx), value, grpc_stats_table_6, 64)); -} void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 64); if (value < 3) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -708,17 +661,17 @@ void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_8, 8)); } -const int grpc_stats_histo_buckets[14] = {64, 128, 64, 64, 64, 64, 64, - 64, 64, 64, 64, 64, 64, 8}; -const int grpc_stats_histo_start[14] = {0, 64, 192, 256, 320, 384, 448, - 512, 576, 640, 704, 768, 832, 896}; -const int *const grpc_stats_histo_bucket_boundaries[14] = { +const int grpc_stats_histo_buckets[13] = {64, 128, 64, 64, 64, 64, 64, + 64, 64, 64, 64, 64, 8}; +const int grpc_stats_histo_start[13] = {0, 64, 192, 256, 320, 384, 448, + 512, 576, 640, 704, 768, 832}; +const int *const grpc_stats_histo_bucket_boundaries[13] = { grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_6, grpc_stats_table_6, grpc_stats_table_6, - grpc_stats_table_6, grpc_stats_table_8}; -void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = { + grpc_stats_table_8}; +void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_call_initial_size, grpc_stats_inc_poll_events_returned, grpc_stats_inc_tcp_write_size, @@ -731,5 +684,4 @@ void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_http2_send_message_per_write, grpc_stats_inc_http2_send_trailing_metadata_per_write, grpc_stats_inc_http2_send_flowctl_per_write, - grpc_stats_inc_executor_closures_per_wakeup, grpc_stats_inc_server_cqs_checked}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 6c0ad30543..28dab00117 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -111,8 +111,6 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED, GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED, GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, - GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED, - GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, GRPC_STATS_COUNTER_COUNT @@ -132,7 +130,6 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; @@ -163,11 +160,9 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 768, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_FIRST_SLOT = 832, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 896, + GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 832, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_BUCKETS = 8, - GRPC_STATS_HISTOGRAM_BUCKETS = 904 + GRPC_STATS_HISTOGRAM_BUCKETS = 840 } grpc_stats_histogram_constants; #define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) @@ -417,11 +412,6 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED) #define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES) -#define GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), \ - GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED) -#define GRPC_STATS_INC_EXECUTOR_THREADS_USED(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED) #define GRPC_STATS_INC_SERVER_REQUESTED_CALLS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS) #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ @@ -468,17 +458,13 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write( grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int x); -#define GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, value) \ - grpc_stats_inc_executor_closures_per_wakeup((exec_ctx), (int)(value)) -void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, - int x); #define GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, value) \ grpc_stats_inc_server_cqs_checked((exec_ctx), (int)(value)) void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int x); -extern const int grpc_stats_histo_buckets[14]; -extern const int grpc_stats_histo_start[14]; -extern const int *const grpc_stats_histo_bucket_boundaries[14]; -extern void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, +extern const int grpc_stats_histo_buckets[13]; +extern const int grpc_stats_histo_start[13]; +extern const int *const grpc_stats_histo_bucket_boundaries[13]; +extern void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx, int x); #endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index de575f01c7..b5c15ff55c 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -259,14 +259,6 @@ - counter: executor_push_retries doc: Number of times we raced and were forced to retry pushing a closure to the executor -- counter: executor_threads_created - doc: Size of the backing thread pool for overflow gRPC Core work -- counter: executor_threads_used - doc: How many executor threads actually got used -- histogram: executor_closures_per_wakeup - max: 1024 - buckets: 64 - doc: Number of closures executed each time an executor wakes up # server - counter: server_requested_calls doc: How many calls were requested (not necessarily received) by the server diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 0611ccaff0..f96e40c00e 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -84,7 +84,5 @@ executor_scheduled_to_self_per_iteration:FLOAT, executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, -executor_threads_created_per_iteration:FLOAT, -executor_threads_used_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, server_slowpath_requests_queued_per_iteration:FLOAT diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 2439f15a8a..892385d7d7 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -32,14 +32,16 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/spinlock.h" +#define MAX_DEPTH 2 + typedef struct { gpr_mu mu; gpr_cv cv; grpc_closure_list elems; + size_t depth; bool shutdown; bool queued_long_job; gpr_thd_id id; - grpc_closure_list local_elems; } thread_state; static thread_state *g_thread_state; @@ -54,35 +56,32 @@ static grpc_tracer_flag executor_trace = static void executor_thread(void *arg); -static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { - int n = 0; // number of closures executed +static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { + size_t n = 0; - while (!grpc_closure_list_empty(*list)) { - grpc_closure *c = list->head; - grpc_closure_list_init(list); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; - if (GRPC_TRACER_ON(executor_trace)) { + grpc_closure *c = list.head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); #else - gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); #endif - } + } #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - n++; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - grpc_exec_ctx_flush(exec_ctx); - } + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + n++; + grpc_exec_ctx_flush(exec_ctx); } - GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n); + return n; } bool grpc_executor_is_threaded() { @@ -127,7 +126,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(exec_ctx, &g_thread_state[i].elems); + run_closures(exec_ctx, g_thread_state[i].elems); } gpr_free(g_thread_state); gpr_tls_destroy(&g_this_thread_state); @@ -151,14 +150,14 @@ static void executor_thread(void *arg) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); - GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx); - - bool used = false; + size_t subtract_depth = 0; for (;;) { if (GRPC_TRACER_ON(executor_trace)) { - gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state)); + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); } gpr_mu_lock(&ts->mu); + ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -171,20 +170,15 @@ static void executor_thread(void *arg) { gpr_mu_unlock(&ts->mu); break; } - if (!used) { - GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx); - used = true; - } GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); - GPR_ASSERT(grpc_closure_list_empty(ts->local_elems)); - ts->local_elems = ts->elems; + grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); if (GRPC_TRACER_ON(executor_trace)) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } - run_closures(&exec_ctx, &ts->local_elems); + subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); } @@ -217,10 +211,6 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - if (is_short) { - grpc_closure_list_append(&ts->local_elems, closure, error); - return; - } } thread_state *orig_ts = ts; @@ -260,7 +250,8 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); - try_new_thread = ts->elems.head != closure && + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads && !ts->shutdown; if (!is_short) ts->queued_long_job = true; gpr_mu_unlock(&ts->mu); |