diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/debug/stats_data.c | 48 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.h | 26 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.yaml | 8 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data_bq_schema.sql | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 69 |
5 files changed, 110 insertions, 43 deletions
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index b3e1ee9b4e..9ca3b3d5de 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -81,6 +81,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_wakeup_initiated", "executor_queue_drained", "executor_push_retries", + "executor_threads_created", + "executor_threads_used", "server_requested_calls", "server_slowpath_requests_queued", }; @@ -151,6 +153,8 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of times an executor queue was drained", "Number of times we raced and were forced to retry pushing a closure to " "the executor", + "Size of the backing thread pool for overflow gRPC Core work", + "How many executor threads actually got used", "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", @@ -166,6 +170,7 @@ const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "http2_send_message_per_write", "http2_send_trailing_metadata_per_write", "http2_send_flowctl_per_write", + "executor_closures_per_wakeup", "server_cqs_checked", }; const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { @@ -179,6 +184,7 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { "Number of streams whose payload was written per TCP write", "Number of streams terminated per TCP write", "Number of flow control updates written per TCP write", + "Number of closures executed each time an executor wakes up", "How many completion queues were checked looking for a CQ that had " "requested the incoming call", }; @@ -479,6 +485,33 @@ void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_2, 64)); } +void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, + int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_2, 64)); +} void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { value = GPR_CLAMP(value, 0, 64); if (value < 3) { @@ -504,16 +537,16 @@ void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_4, 8)); } -const int grpc_stats_histo_buckets[11] = {64, 64, 64, 64, 64, 64, - 64, 64, 64, 64, 8}; -const int grpc_stats_histo_start[11] = {0, 64, 128, 192, 256, 320, - 384, 448, 512, 576, 640}; -const int *const grpc_stats_histo_bucket_boundaries[11] = { +const int grpc_stats_histo_buckets[12] = {64, 64, 64, 64, 64, 64, + 64, 64, 64, 64, 64, 8}; +const int grpc_stats_histo_start[12] = {0, 64, 128, 192, 256, 320, + 384, 448, 512, 576, 640, 704}; +const int *const grpc_stats_histo_bucket_boundaries[12] = { grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_2, grpc_stats_table_2, - grpc_stats_table_2, grpc_stats_table_4}; -void (*const grpc_stats_inc_histogram[11])(grpc_exec_ctx *exec_ctx, int x) = { + grpc_stats_table_2, grpc_stats_table_2, grpc_stats_table_4}; +void (*const grpc_stats_inc_histogram[12])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size, grpc_stats_inc_tcp_read_size, @@ -524,4 +557,5 @@ void (*const grpc_stats_inc_histogram[11])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_http2_send_message_per_write, grpc_stats_inc_http2_send_trailing_metadata_per_write, grpc_stats_inc_http2_send_flowctl_per_write, + grpc_stats_inc_executor_closures_per_wakeup, grpc_stats_inc_server_cqs_checked}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index c9871c4a56..fa76f86376 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -83,6 +83,8 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED, GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED, GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, + GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED, + GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, GRPC_STATS_COUNTER_COUNT @@ -100,6 +102,7 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, + GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; @@ -126,9 +129,11 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 576, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 640, + GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_FIRST_SLOT = 640, + GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 704, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_BUCKETS = 8, - GRPC_STATS_HISTOGRAM_BUCKETS = 648 + GRPC_STATS_HISTOGRAM_BUCKETS = 712 } grpc_stats_histogram_constants; #define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) @@ -307,6 +312,11 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED) #define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES) +#define GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED) +#define GRPC_STATS_INC_EXECUTOR_THREADS_USED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED) #define GRPC_STATS_INC_SERVER_REQUESTED_CALLS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS) #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ @@ -347,13 +357,17 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write( grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, value) \ + grpc_stats_inc_executor_closures_per_wakeup((exec_ctx), (int)(value)) +void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, + int x); #define GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, value) \ grpc_stats_inc_server_cqs_checked((exec_ctx), (int)(value)) void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int x); -extern const int grpc_stats_histo_buckets[11]; -extern const int grpc_stats_histo_start[11]; -extern const int *const grpc_stats_histo_bucket_boundaries[11]; -extern void (*const grpc_stats_inc_histogram[11])(grpc_exec_ctx *exec_ctx, +extern const int grpc_stats_histo_buckets[12]; +extern const int grpc_stats_histo_start[12]; +extern const int *const grpc_stats_histo_bucket_boundaries[12]; +extern void (*const grpc_stats_inc_histogram[12])(grpc_exec_ctx *exec_ctx, int x); #endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 84727fe6c4..63987d250a 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -185,6 +185,14 @@ - counter: executor_push_retries doc: Number of times we raced and were forced to retry pushing a closure to the executor +- counter: executor_threads_created + doc: Size of the backing thread pool for overflow gRPC Core work +- counter: executor_threads_used + doc: How many executor threads actually got used +- histogram: executor_closures_per_wakeup + max: 1024 + buckets: 64 + doc: Number of closures executed each time an executor wakes up # server - counter: server_requested_calls doc: How many calls were requested (not necessarily received) by the server diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index d21afbbfe4..b0202e792f 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -56,5 +56,7 @@ executor_scheduled_to_self_per_iteration:FLOAT, executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, +executor_threads_created_per_iteration:FLOAT, +executor_threads_used_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, server_slowpath_requests_queued_per_iteration:FLOAT diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 892385d7d7..2439f15a8a 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -32,16 +32,14 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/spinlock.h" -#define MAX_DEPTH 2 - typedef struct { gpr_mu mu; gpr_cv cv; grpc_closure_list elems; - size_t depth; bool shutdown; bool queued_long_job; gpr_thd_id id; + grpc_closure_list local_elems; } thread_state; static thread_state *g_thread_state; @@ -56,32 +54,35 @@ static grpc_tracer_flag executor_trace = static void executor_thread(void *arg); -static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { - size_t n = 0; +static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { + int n = 0; // number of closures executed - grpc_closure *c = list.head; - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; - if (GRPC_TRACER_ON(executor_trace)) { + while (!grpc_closure_list_empty(*list)) { + grpc_closure *c = list->head; + grpc_closure_list_init(list); + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); #else - gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); #endif - } + } #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - n++; - grpc_exec_ctx_flush(exec_ctx); + n++; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + grpc_exec_ctx_flush(exec_ctx); + } } - return n; + GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n); } bool grpc_executor_is_threaded() { @@ -126,7 +127,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(exec_ctx, g_thread_state[i].elems); + run_closures(exec_ctx, &g_thread_state[i].elems); } gpr_free(g_thread_state); gpr_tls_destroy(&g_this_thread_state); @@ -150,14 +151,14 @@ static void executor_thread(void *arg) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); - size_t subtract_depth = 0; + GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx); + + bool used = false; for (;;) { if (GRPC_TRACER_ON(executor_trace)) { - gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", - (int)(ts - g_thread_state), subtract_depth); + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state)); } gpr_mu_lock(&ts->mu); - ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -170,15 +171,20 @@ static void executor_thread(void *arg) { gpr_mu_unlock(&ts->mu); break; } + if (!used) { + GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx); + used = true; + } GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); - grpc_closure_list exec = ts->elems; + GPR_ASSERT(grpc_closure_list_empty(ts->local_elems)); + ts->local_elems = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); if (GRPC_TRACER_ON(executor_trace)) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } - subtract_depth = run_closures(&exec_ctx, exec); + run_closures(&exec_ctx, &ts->local_elems); } grpc_exec_ctx_finish(&exec_ctx); } @@ -211,6 +217,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); + if (is_short) { + grpc_closure_list_append(&ts->local_elems, closure, error); + return; + } } thread_state *orig_ts = ts; @@ -250,8 +260,7 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - try_new_thread = ts->depth > MAX_DEPTH && + try_new_thread = ts->elems.head != closure && cur_thread_count < g_max_threads && !ts->shutdown; if (!is_short) ts->queued_long_job = true; gpr_mu_unlock(&ts->mu); |