diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 30 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 7 | ||||
-rw-r--r-- | src/core/lib/security/credentials/fake/fake_credentials.c | 9 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 8 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 3 |
9 files changed, 55 insertions, 23 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 63bc786f1d..dcc87397e3 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -878,10 +878,11 @@ static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, bool early_results_scheduled) { switch (t->opt_target) { case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: - return grpc_executor_scheduler; + return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: - return early_results_scheduled ? grpc_executor_scheduler - : grpc_schedule_on_exec_ctx; + return early_results_scheduled + ? grpc_executor_scheduler(GRPC_EXECUTOR_SHORT) + : grpc_schedule_on_exec_ctx; } GPR_UNREACHABLE_CODE(return NULL); } @@ -919,8 +920,6 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE : GRPC_CHTTP2_WRITE_STATE_WRITING, begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); - GPR_ASSERT(scheduler == grpc_schedule_on_exec_ctx || - scheduler == grpc_executor_scheduler); GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler), GRPC_ERROR_NONE); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index c72c37e2b5..518024fcfd 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -82,7 +82,8 @@ grpc_combiner *grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 757bb6fe1b..504aa8600b 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -181,7 +181,7 @@ static void executor_thread(void *arg) { } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { + grpc_error *error, bool is_short) { size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); if (cur_thread_count == 0) { if (GRPC_TRACER_ON(executor_trace)) { @@ -221,7 +221,27 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, } } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index c3382a0a12..0412c02790 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,11 @@ #include "src/core/lib/iomgr/closure.h" +typedef enum { + GRPC_EXECUTOR_SHORT, + GRPC_EXECUTOR_LONG +} grpc_executor_job_length; + /** Initialize the global executor. * * This mechanism is meant to outsource work (grpc_closure instances) to a @@ -28,7 +33,7 @@ * non-blocking solution available. */ void grpc_executor_init(grpc_exec_ctx *exec_ctx); -extern grpc_closure_scheduler *grpc_executor_scheduler; +grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 35dedc23de..c314ef7b5c 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -176,7 +176,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(false)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 8aa32ee51f..9a53f1958d 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -150,9 +150,12 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (old_count == 0) { p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); - GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, grpc_executor_scheduler); gpr_atm_no_barrier_store(&g_backup_poller, (gpr_atm)p); - GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), + GRPC_ERROR_NONE); } else { p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller); GPR_ASSERT(p != NULL); diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c index 67e74f7b92..a01dd379e4 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.c +++ b/src/core/lib/security/credentials/fake/fake_credentials.c @@ -120,10 +120,11 @@ static void md_only_test_get_request_metadata( if (c->is_async) { grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done, - cb_arg, grpc_executor_scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done, cb_arg, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_ERROR_NONE); } else { cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL); } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fce7f8dca1..4bdc03052f 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1116,9 +1116,11 @@ void grpc_server_start(grpc_server *server) { server_ref(server); server->starting = true; - GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + &exec_ctx, + GRPC_CLOSURE_CREATE(start_listeners, server, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 7281602d66..3c69279537 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, cope with. Throw this over to the executor (on a core-owned thread) and process it there. */ - refcount->destroy.scheduler = grpc_executor_scheduler; + refcount->destroy.scheduler = + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); } GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } |