aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/debug/stats_data.c62
-rw-r--r--src/core/lib/debug/stats_data.h26
-rw-r--r--src/core/lib/debug/stats_data.yaml8
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql2
-rw-r--r--src/core/lib/iomgr/executor.c69
5 files changed, 43 insertions, 124 deletions
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index fb6055f795..c0aec63c1d 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -109,8 +109,6 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_wakeup_initiated",
"executor_queue_drained",
"executor_push_retries",
- "executor_threads_created",
- "executor_threads_used",
"server_requested_calls",
"server_slowpath_requests_queued",
};
@@ -219,8 +217,6 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of times an executor queue was drained",
"Number of times we raced and were forced to retry pushing a closure to "
"the executor",
- "Size of the backing thread pool for overflow gRPC Core work",
- "How many executor threads actually got used",
"How many calls were requested (not necessarily received) by the server",
"How many times was the server slow path taken (indicates too few "
"outstanding requests)",
@@ -238,7 +234,6 @@ const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
"http2_send_message_per_write",
"http2_send_trailing_metadata_per_write",
"http2_send_flowctl_per_write",
- "executor_closures_per_wakeup",
"server_cqs_checked",
};
const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
@@ -254,7 +249,6 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
"Number of streams whose payload was written per TCP write",
"Number of streams terminated per TCP write",
"Number of flow control updates written per TCP write",
- "Number of closures executed each time an executor wakes up",
"How many completion queues were checked looking for a CQ that had "
"requested the incoming call",
};
@@ -326,7 +320,6 @@ const uint8_t grpc_stats_table_7[102] = {
const int grpc_stats_table_8[9] = {0, 1, 2, 4, 7, 13, 23, 39, 64};
const uint8_t grpc_stats_table_9[9] = {0, 0, 1, 2, 2, 3, 4, 4, 5};
void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 262144);
if (value < 6) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE,
@@ -352,7 +345,6 @@ void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) {
(exec_ctx), value, grpc_stats_table_0, 64));
}
void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 29) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx),
@@ -379,7 +371,6 @@ void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) {
(exec_ctx), value, grpc_stats_table_2, 128));
}
void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 16777216);
if (value < 5) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE,
@@ -405,7 +396,6 @@ void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) {
(exec_ctx), value, grpc_stats_table_4, 64));
}
void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx),
@@ -431,7 +421,6 @@ void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) {
(exec_ctx), value, grpc_stats_table_6, 64));
}
void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 16777216);
if (value < 5) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE,
@@ -457,7 +446,6 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) {
(exec_ctx), value, grpc_stats_table_4, 64));
}
void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 16777216);
if (value < 5) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER,
@@ -484,7 +472,6 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
}
void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx,
int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM(
@@ -512,7 +499,6 @@ void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx,
}
void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx,
int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 16777216);
if (value < 5) {
GRPC_STATS_INC_HISTOGRAM(
@@ -540,7 +526,6 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx,
}
void grpc_stats_inc_http2_send_initial_metadata_per_write(
grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM(
@@ -570,7 +555,6 @@ void grpc_stats_inc_http2_send_initial_metadata_per_write(
}
void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx,
int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM(
@@ -598,7 +582,6 @@ void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx,
}
void grpc_stats_inc_http2_send_trailing_metadata_per_write(
grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM(
@@ -628,7 +611,6 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write(
}
void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx,
int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
GRPC_STATS_INC_HISTOGRAM(
@@ -654,36 +636,7 @@ void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx,
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_6, 64));
}
-void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx,
- int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
- value = GPR_CLAMP(value, 0, 1024);
- if (value < 13) {
- GRPC_STATS_INC_HISTOGRAM(
- (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, value);
- return;
- }
- union {
- double dbl;
- uint64_t uint;
- } _val, _bkt;
- _val.dbl = value;
- if (_val.uint < 4637863191261478912ull) {
- int bucket =
- grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
- _bkt.dbl = grpc_stats_table_6[bucket];
- bucket -= (_val.uint < _bkt.uint);
- GRPC_STATS_INC_HISTOGRAM(
- (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, bucket);
- return;
- }
- GRPC_STATS_INC_HISTOGRAM((exec_ctx),
- GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP,
- grpc_stats_histo_find_bucket_slow(
- (exec_ctx), value, grpc_stats_table_6, 64));
-}
void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) {
- /* Automatically generated by tools/codegen/core/gen_stats_data.py */
value = GPR_CLAMP(value, 0, 64);
if (value < 3) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx),
@@ -708,17 +661,17 @@ void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) {
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_8, 8));
}
-const int grpc_stats_histo_buckets[14] = {64, 128, 64, 64, 64, 64, 64,
- 64, 64, 64, 64, 64, 64, 8};
-const int grpc_stats_histo_start[14] = {0, 64, 192, 256, 320, 384, 448,
- 512, 576, 640, 704, 768, 832, 896};
-const int *const grpc_stats_histo_bucket_boundaries[14] = {
+const int grpc_stats_histo_buckets[13] = {64, 128, 64, 64, 64, 64, 64,
+ 64, 64, 64, 64, 64, 8};
+const int grpc_stats_histo_start[13] = {0, 64, 192, 256, 320, 384, 448,
+ 512, 576, 640, 704, 768, 832};
+const int *const grpc_stats_histo_bucket_boundaries[13] = {
grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_4,
grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_4,
grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_6,
grpc_stats_table_6, grpc_stats_table_6, grpc_stats_table_6,
- grpc_stats_table_6, grpc_stats_table_8};
-void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = {
+ grpc_stats_table_8};
+void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx, int x) = {
grpc_stats_inc_call_initial_size,
grpc_stats_inc_poll_events_returned,
grpc_stats_inc_tcp_write_size,
@@ -731,5 +684,4 @@ void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = {
grpc_stats_inc_http2_send_message_per_write,
grpc_stats_inc_http2_send_trailing_metadata_per_write,
grpc_stats_inc_http2_send_flowctl_per_write,
- grpc_stats_inc_executor_closures_per_wakeup,
grpc_stats_inc_server_cqs_checked};
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index 6c0ad30543..28dab00117 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -111,8 +111,6 @@ typedef enum {
GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED,
GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED,
GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
- GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED,
- GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED,
GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS,
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED,
GRPC_STATS_COUNTER_COUNT
@@ -132,7 +130,6 @@ typedef enum {
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE,
- GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP,
GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED,
GRPC_STATS_HISTOGRAM_COUNT
} grpc_stats_histograms;
@@ -163,11 +160,9 @@ typedef enum {
GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 768,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64,
- GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_FIRST_SLOT = 832,
- GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_BUCKETS = 64,
- GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 896,
+ GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 832,
GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_BUCKETS = 8,
- GRPC_STATS_HISTOGRAM_BUCKETS = 904
+ GRPC_STATS_HISTOGRAM_BUCKETS = 840
} grpc_stats_histogram_constants;
#define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED)
@@ -417,11 +412,6 @@ typedef enum {
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED)
#define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES)
-#define GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(exec_ctx) \
- GRPC_STATS_INC_COUNTER((exec_ctx), \
- GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED)
-#define GRPC_STATS_INC_EXECUTOR_THREADS_USED(exec_ctx) \
- GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED)
#define GRPC_STATS_INC_SERVER_REQUESTED_CALLS(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS)
#define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \
@@ -468,17 +458,13 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write(
grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value))
void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx,
int x);
-#define GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, value) \
- grpc_stats_inc_executor_closures_per_wakeup((exec_ctx), (int)(value))
-void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx,
- int x);
#define GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, value) \
grpc_stats_inc_server_cqs_checked((exec_ctx), (int)(value))
void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int x);
-extern const int grpc_stats_histo_buckets[14];
-extern const int grpc_stats_histo_start[14];
-extern const int *const grpc_stats_histo_bucket_boundaries[14];
-extern void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx,
+extern const int grpc_stats_histo_buckets[13];
+extern const int grpc_stats_histo_start[13];
+extern const int *const grpc_stats_histo_bucket_boundaries[13];
+extern void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx,
int x);
#endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index de575f01c7..b5c15ff55c 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -259,14 +259,6 @@
- counter: executor_push_retries
doc: Number of times we raced and were forced to retry pushing a closure to
the executor
-- counter: executor_threads_created
- doc: Size of the backing thread pool for overflow gRPC Core work
-- counter: executor_threads_used
- doc: How many executor threads actually got used
-- histogram: executor_closures_per_wakeup
- max: 1024
- buckets: 64
- doc: Number of closures executed each time an executor wakes up
# server
- counter: server_requested_calls
doc: How many calls were requested (not necessarily received) by the server
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index 0611ccaff0..f96e40c00e 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -84,7 +84,5 @@ executor_scheduled_to_self_per_iteration:FLOAT,
executor_wakeup_initiated_per_iteration:FLOAT,
executor_queue_drained_per_iteration:FLOAT,
executor_push_retries_per_iteration:FLOAT,
-executor_threads_created_per_iteration:FLOAT,
-executor_threads_used_per_iteration:FLOAT,
server_requested_calls_per_iteration:FLOAT,
server_slowpath_requests_queued_per_iteration:FLOAT
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 2439f15a8a..892385d7d7 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -32,14 +32,16 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/spinlock.h"
+#define MAX_DEPTH 2
+
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_closure_list elems;
+ size_t depth;
bool shutdown;
bool queued_long_job;
gpr_thd_id id;
- grpc_closure_list local_elems;
} thread_state;
static thread_state *g_thread_state;
@@ -54,35 +56,32 @@ static grpc_tracer_flag executor_trace =
static void executor_thread(void *arg);
-static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
- int n = 0; // number of closures executed
+static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
+ size_t n = 0;
- while (!grpc_closure_list_empty(*list)) {
- grpc_closure *c = list->head;
- grpc_closure_list_init(list);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ grpc_closure *c = list.head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
- c->file_created, c->line_created);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
#else
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
- }
+ }
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- n++;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- grpc_exec_ctx_flush(exec_ctx);
- }
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ n++;
+ grpc_exec_ctx_flush(exec_ctx);
}
- GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n);
+ return n;
}
bool grpc_executor_is_threaded() {
@@ -127,7 +126,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(exec_ctx, &g_thread_state[i].elems);
+ run_closures(exec_ctx, g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
@@ -151,14 +150,14 @@ static void executor_thread(void *arg) {
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
- GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx);
-
- bool used = false;
+ size_t subtract_depth = 0;
for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
- gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state));
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
+ (int)(ts - g_thread_state), subtract_depth);
}
gpr_mu_lock(&ts->mu);
+ ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -171,20 +170,15 @@ static void executor_thread(void *arg) {
gpr_mu_unlock(&ts->mu);
break;
}
- if (!used) {
- GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx);
- used = true;
- }
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
- GPR_ASSERT(grpc_closure_list_empty(ts->local_elems));
- ts->local_elems = ts->elems;
+ grpc_closure_list exec = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
- run_closures(&exec_ctx, &ts->local_elems);
+ subtract_depth = run_closures(&exec_ctx, exec);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -217,10 +211,6 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
- if (is_short) {
- grpc_closure_list_append(&ts->local_elems, closure, error);
- return;
- }
}
thread_state *orig_ts = ts;
@@ -260,7 +250,8 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
- try_new_thread = ts->elems.head != closure &&
+ ts->depth++;
+ try_new_thread = ts->depth > MAX_DEPTH &&
cur_thread_count < g_max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
gpr_mu_unlock(&ts->mu);