aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c9
-rw-r--r--src/core/lib/iomgr/combiner.c3
-rw-r--r--src/core/lib/iomgr/executor.c30
-rw-r--r--src/core/lib/iomgr/executor.h7
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_posix.c7
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c9
-rw-r--r--src/core/lib/surface/server.c8
-rw-r--r--src/core/lib/transport/transport.c3
9 files changed, 55 insertions, 23 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 63bc786f1d..dcc87397e3 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -878,10 +878,11 @@ static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
bool early_results_scheduled) {
switch (t->opt_target) {
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
- return grpc_executor_scheduler;
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
- return early_results_scheduled ? grpc_executor_scheduler
- : grpc_schedule_on_exec_ctx;
+ return early_results_scheduled
+ ? grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)
+ : grpc_schedule_on_exec_ctx;
}
GPR_UNREACHABLE_CODE(return NULL);
}
@@ -919,8 +920,6 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
: GRPC_CHTTP2_WRITE_STATE_WRITING,
begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
- GPR_ASSERT(scheduler == grpc_schedule_on_exec_ctx ||
- scheduler == grpc_executor_scheduler);
GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action,
write_action, t, scheduler),
GRPC_ERROR_NONE);
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index c72c37e2b5..518024fcfd 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -82,7 +82,8 @@ grpc_combiner *grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler);
+ GRPC_CLOSURE_INIT(&lock->offload, offload, lock,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 757bb6fe1b..504aa8600b 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -181,7 +181,7 @@ static void executor_thread(void *arg) {
}
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
+ grpc_error *error, bool is_short) {
size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
if (cur_thread_count == 0) {
if (GRPC_TRACER_ON(executor_trace)) {
@@ -221,7 +221,27 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
}
}
-static const grpc_closure_scheduler_vtable executor_vtable = {
- executor_push, executor_push, "executor"};
-static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
-grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
+static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, true);
+}
+
+static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, false);
+}
+
+static const grpc_closure_scheduler_vtable executor_vtable_short = {
+ executor_push_short, executor_push_short, "executor"};
+static grpc_closure_scheduler executor_scheduler_short = {
+ &executor_vtable_short};
+
+static const grpc_closure_scheduler_vtable executor_vtable_long = {
+ executor_push_long, executor_push_long, "executor"};
+static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
+
+grpc_closure_scheduler *grpc_executor_scheduler(
+ grpc_executor_job_length length) {
+ return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
+ : &executor_scheduler_long;
+}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index c3382a0a12..0412c02790 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -21,6 +21,11 @@
#include "src/core/lib/iomgr/closure.h"
+typedef enum {
+ GRPC_EXECUTOR_SHORT,
+ GRPC_EXECUTOR_LONG
+} grpc_executor_job_length;
+
/** Initialize the global executor.
*
* This mechanism is meant to outsource work (grpc_closure instances) to a
@@ -28,7 +33,7 @@
* non-blocking solution available. */
void grpc_executor_init(grpc_exec_ctx *exec_ctx);
-extern grpc_closure_scheduler *grpc_executor_scheduler;
+grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 35dedc23de..c314ef7b5c 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -176,7 +176,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler);
+ grpc_executor_scheduler(false));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 8aa32ee51f..9a53f1958d 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -150,9 +150,12 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (old_count == 0) {
p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size());
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
- GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, grpc_executor_scheduler);
gpr_atm_no_barrier_store(&g_backup_poller, (gpr_atm)p);
- GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG)),
+ GRPC_ERROR_NONE);
} else {
p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller);
GPR_ASSERT(p != NULL);
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 67e74f7b92..a01dd379e4 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -120,10 +120,11 @@ static void md_only_test_get_request_metadata(
if (c->is_async) {
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- GRPC_CLOSURE_SCHED(exec_ctx,
- GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done,
- cb_arg, grpc_executor_scheduler),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done, cb_arg,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
+ GRPC_ERROR_NONE);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..4bdc03052f 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1116,9 +1116,11 @@ void grpc_server_start(grpc_server *server) {
server_ref(server);
server->starting = true;
- GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server,
- grpc_executor_scheduler),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ &exec_ctx,
+ GRPC_CLOSURE_CREATE(start_listeners, server,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 7281602d66..3c69279537 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
cope with.
Throw this over to the executor (on a core-owned thread) and process it
there. */
- refcount->destroy.scheduler = grpc_executor_scheduler;
+ refcount->destroy.scheduler =
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
}
GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}