diff options
Diffstat (limited to 'src/core/lib')
58 files changed, 530 insertions, 476 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 1d0b7d4f31..cddd84fcca 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -297,7 +297,8 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->cancel_error = GRPC_ERROR_CANCELLED; - op->on_complete = grpc_closure_create(destroy_op, op); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } @@ -307,7 +308,8 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); - op->on_complete = grpc_closure_create(destroy_op, op); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); grpc_transport_stream_op_add_cancellation_with_message(op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); @@ -319,7 +321,8 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); - op->on_complete = grpc_closure_create(destroy_op, op); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); grpc_transport_stream_op_add_close(op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 0e336dc330..35455d4908 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -269,8 +269,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; - grpc_closure_init(&calld->got_slice, got_slice, elem); - grpc_closure_init(&calld->send_done, send_done, elem); + grpc_closure_init(&calld->got_slice, got_slice, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->send_done, send_done, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 470ccfea57..902ebf1955 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -123,7 +123,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { static void inject_on_complete_cb(grpc_deadline_state* deadline_state, grpc_transport_stream_op* op) { deadline_state->next_on_complete = op->on_complete; - grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state); + grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, + grpc_schedule_on_exec_ctx); op->on_complete = &deadline_state->on_complete; } @@ -172,8 +173,9 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); state->elem = elem; state->deadline = deadline; - grpc_closure_init(&state->closure, start_timer_after_init, state); - grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_init(&state->closure, start_timer_after_init, state, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE); } } @@ -290,7 +292,8 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; calld->recv_initial_metadata = op->recv_initial_metadata; grpc_closure_init(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem); + recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } // Make sure we know when the call is complete, so that we can cancel diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 23edc826ca..ff827527b3 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -165,7 +165,7 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); - grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL); + grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error); mgr->shutdown = true; } else { grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index], @@ -218,8 +218,10 @@ void grpc_handshake_manager_do_handshake( grpc_slice_buffer_init(mgr->args.read_buffer); // Initialize state needed for calling handshakers. mgr->acceptor = acceptor; - grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr); - grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args); + grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args, + grpc_schedule_on_exec_ctx); // Start deadline timer, which owns a ref. gpr_ref(&mgr->refs); grpc_timer_init(exec_ctx, &mgr->deadline_timer, diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 1a2d08dda5..c37cab3939 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -352,12 +352,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->send_message_blocked = false; grpc_slice_buffer_init(&calld->slices); grpc_closure_init(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem); + hc_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); grpc_closure_init(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem); - grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem); - grpc_closure_init(&calld->got_slice, got_slice, elem); - grpc_closure_init(&calld->send_done, send_done, elem); + hc_on_recv_trailing_metadata, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->got_slice, got_slice, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->send_done, send_done, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index a5134ee21b..a6d720530a 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -334,9 +334,12 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); - grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem); - grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem); + grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, + grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index f05c789010..b5e882de52 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -124,7 +124,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_free(message_string); } // Invoke the next callback. - grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); + grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); } // Start transport stream op. @@ -160,7 +160,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; - grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 1035f31109..581a74b73f 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -103,7 +103,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL); + grpc_closure_sched(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); @@ -224,7 +224,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, return; } addr = &req->addresses->addrs[req->next_address++]; - grpc_closure_init(&req->connected, on_connected, req); + grpc_closure_init(&req->connected, on_connected, req, + grpc_schedule_on_exec_ctx); grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; arg.type = GRPC_ARG_POINTER; @@ -266,8 +267,9 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, req->pollent = pollent; req->overall_error = GRPC_ERROR_NONE; req->resource_quota = grpc_resource_quota_internal_ref(resource_quota); - grpc_closure_init(&req->on_read, on_read, req); - grpc_closure_init(&req->done_write, done_write, req); + grpc_closure_init(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx); + grpc_closure_init(&req->done_write, done_write, req, + grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&req->incoming); grpc_slice_buffer_init(&req->outgoing); grpc_iomgr_register_object(&req->iomgr_obj, name); @@ -277,9 +279,11 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, GPR_ASSERT(pollent); grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port, - req->context->pollset_set, - grpc_closure_create(on_resolved, req), &req->addresses); + grpc_resolve_address( + exec_ctx, request->host, req->handshaker->default_port, + req->context->pollset_set, + grpc_closure_create(on_resolved, req, grpc_schedule_on_exec_ctx), + &req->addresses); } void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 14cdb1dab3..6b197c2e35 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -96,7 +96,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE(msg); gpr_free(msg); } - grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); + grpc_closure_sched(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index c6ddc76732..da0ec878a3 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -37,10 +37,13 @@ #include "src/core/lib/profiling/timers.h" -void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg) { +grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg, + grpc_closure_scheduler *scheduler) { closure->cb = cb; closure->cb_arg = cb_arg; + closure->scheduler = scheduler; + return closure; } void grpc_closure_list_init(grpc_closure_list *closure_list) { @@ -105,11 +108,12 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, cb(exec_ctx, cb_arg, error); } -grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, + grpc_closure_scheduler *scheduler) { wrapped_closure *wc = gpr_malloc(sizeof(*wc)); wc->cb = cb; wc->cb_arg = cb_arg; - grpc_closure_init(&wc->wrapper, closure_wrapper, wc); + grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); return &wc->wrapper; } @@ -117,8 +121,30 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { - c->cb(exec_ctx, c->cb_arg, error); + c->scheduler->vtable->run(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); } - GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_closure_run", 0); } + +void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, + grpc_error *error) { + GPR_TIMER_BEGIN("grpc_closure_sched", 0); + if (c != NULL) { + c->scheduler->vtable->sched(exec_ctx, c, error); + } else { + GRPC_ERROR_UNREF(error); + } + GPR_TIMER_END("grpc_closure_sched", 0); +} + +void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { + grpc_closure *c = list->head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); + c = next; + } + list->head = list->tail = NULL; +} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 2b4b271eaa..1b5d9b20a0 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -59,6 +59,22 @@ typedef struct grpc_closure_list { typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +typedef struct grpc_closure_scheduler grpc_closure_scheduler; + +typedef struct grpc_closure_scheduler_vtable { + /* NOTE: for all these functions, closure->scheduler == the scheduler that was + used to find this vtable */ + void (*run)(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); + void (*sched)(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); +} grpc_closure_scheduler_vtable; + +/** Abstract type that can schedule closures for execution */ +struct grpc_closure_scheduler { + const grpc_closure_scheduler_vtable *vtable; +}; + /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { /** Once queued, next indicates the next queued closure; before then, scratch @@ -75,6 +91,10 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; + /** Scheduler to schedule against: NULL to schedule against current execution + context */ + grpc_closure_scheduler *scheduler; + /** Once queued, the result of the closure. Before then: scratch space */ union { grpc_error *error; @@ -82,12 +102,14 @@ struct grpc_closure { } error_data; }; -/** Initializes \a closure with \a cb and \a cb_arg. */ -void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg); +/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ +grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg, + grpc_closure_scheduler *scheduler); /* Create a heap allocated closure: try to avoid except for very rare events */ -grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, + grpc_closure_scheduler *scheduler); #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } @@ -115,4 +137,13 @@ bool grpc_closure_list_empty(grpc_closure_list list); void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); +/** Schedule a closure to be run. Does not need to be run from a safe point. */ +void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); + +/** Schedule all closures in a list to be run. Does not need to be run from a + * safe point. */ +void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, + grpc_closure_list *closure_list); + #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index cfc67020ae..c26a73b2b7 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -56,6 +56,10 @@ int grpc_combiner_trace = 0; struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; + grpc_closure_scheduler uncovered_scheduler; + grpc_closure_scheduler covered_scheduler; + grpc_closure_scheduler uncovered_finally_scheduler; + grpc_closure_scheduler covered_finally_scheduler; gpr_mpscq queue; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) @@ -70,6 +74,26 @@ struct grpc_combiner { grpc_closure offload; }; +static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, + grpc_error *error); +static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, + grpc_error *error); + +static const grpc_closure_scheduler_vtable scheduler_uncovered = { + combiner_exec_uncovered, combiner_exec_uncovered}; +static const grpc_closure_scheduler_vtable scheduler_covered = { + combiner_exec_covered, combiner_exec_covered}; +static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = { + combiner_finally_exec_uncovered, combiner_finally_exec_uncovered}; +static const grpc_closure_scheduler_vtable finally_scheduler_covered = { + combiner_finally_exec_covered, combiner_finally_exec_covered}; + static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); typedef struct { @@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; lock->final_list_covered_by_poller = false; + lock->uncovered_scheduler.vtable = &scheduler_uncovered; + lock->covered_scheduler.vtable = &scheduler_covered; + lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered; + lock->covered_finally_scheduler.vtable = &finally_scheduler_covered; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - grpc_closure_init(&lock->offload, offload, lock); + grpc_closure_init(&lock->offload, offload, lock, + grpc_workqueue_scheduler(lock->optional_workqueue)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error, - bool covered_by_poller) { +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *cl, grpc_error *error, + bool covered_by_poller) { GPR_TIMER_BEGIN("combiner.execute", 0); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log( @@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_TIMER_END("combiner.execute", 0); } +#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ + ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ + offsetof(grpc_combiner, scheduler_name))) + +static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { + combiner_exec(exec_ctx, + COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl, + error, false); +} + +static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { + combiner_exec(exec_ctx, + COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl, + error, true); +} + static void move_next(grpc_exec_ctx *exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; @@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, lock->optional_workqueue)); - grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload, - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { @@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error) { - grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); -} + grpc_error *error); -void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool covered_by_poller) { +static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock, grpc_closure *closure, + grpc_error *error, + bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, closure, exec_ctx->active_combiner, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error, - false); + grpc_closure_sched( + exec_ctx, grpc_closure_create(enqueue_finally, closure, + grpc_combiner_scheduler(lock, false)), + error); GPR_TIMER_END("combiner.execute_finally", 0); return; } @@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } + +static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, + grpc_error *error) { + combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, + GRPC_ERROR_REF(error), false); +} + +static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *cl, + grpc_error *error) { + combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER( + cl, uncovered_finally_scheduler), + cl, error, false); +} + +static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *cl, grpc_error *error) { + combiner_execute_finally( + exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler), + cl, error, true); +} + +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner, + bool covered_by_poller) { + return covered_by_poller ? &combiner->covered_scheduler + : &combiner->uncovered_scheduler; +} + +grpc_closure_scheduler *grpc_combiner_finally_scheduler( + grpc_combiner *combiner, bool covered_by_poller) { + return covered_by_poller ? &combiner->covered_finally_scheduler + : &combiner->uncovered_finally_scheduler; +} diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index d04eeed83a..81dff85d40 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -50,14 +50,12 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); // Destroy the lock void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); -// Execute \a action within the lock. -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool covered_by_poller); -// Execute \a action within the lock just prior to unlocking. -void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool covered_by_poller); +// Fetch a scheduler to schedule closures against +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, + bool covered_by_poller); +// Scheduler to execute \a action within the lock just prior to unlocking. +grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock, + bool covered_by_poller); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 1b15e0eb4f..ac94d2e634 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -202,6 +202,8 @@ static void fd_global_shutdown(void); /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { + grpc_closure_scheduler workqueue_scheduler; + gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement the refcount. @@ -305,6 +307,8 @@ static __thread polling_island *g_current_thread_polling_island; /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); +static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -317,6 +321,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ +static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { + workqueue_enqueue, workqueue_enqueue}; + static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); @@ -529,6 +536,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; pi = gpr_malloc(sizeof(*pi)); + pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; @@ -800,10 +808,10 @@ static polling_island *polling_island_merge(polling_island *p, return q; } -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, grpc_closure *closure, +static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { GPR_TIMER_BEGIN("workqueue.enqueue", 0); + grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; /* take a ref to the workqueue: otherwise it can happen that whatever events * this kicks off ends up destroying the workqueue before this function * completes */ @@ -820,6 +828,11 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("workqueue.enqueue", 0); } +static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { + polling_island *pi = (polling_island *)workqueue; + return &pi->workqueue_scheduler; +} + static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1030,8 +1043,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, fd->po.pi = NULL; } - grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error), - NULL); + grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->po.mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ @@ -1057,16 +1069,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) { static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (fd->shutdown) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"), - NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); } else { /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, @@ -1088,7 +1098,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); *st = CLOSURE_NOT_READY; return 1; } @@ -1359,7 +1369,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, /* Release the ref and set pollset->po.pi to NULL */ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); - grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } /* pollset->po.mu lock must be held by the caller before calling this */ @@ -1959,7 +1969,7 @@ static const grpc_event_engine_vtable vtable = { .workqueue_ref = workqueue_ref, .workqueue_unref = workqueue_unref, - .workqueue_enqueue = workqueue_enqueue, + .workqueue_scheduler = workqueue_scheduler, .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 21b28e5554..5bc5621443 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -397,7 +397,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { if (!fd->released) { close(fd->fd); } - grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -457,16 +457,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) { static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (fd->shutdown) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"), - NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -489,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); *st = CLOSURE_NOT_READY; return 1; } @@ -852,7 +850,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } static void work_combine_error(grpc_error **composite, grpc_error *error) { @@ -901,7 +899,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { GPR_TIMER_MARK("pollset_work.idle_jobs", 0); - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); goto done; } /* If we're shutting down then we don't execute any extended work */ @@ -1081,7 +1079,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); @@ -1100,7 +1098,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutdown_done = closure; pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset_has_workers(pollset)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); } if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { pollset->called_shutdown = 1; @@ -1288,10 +1286,8 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { + return grpc_schedule_on_exec_ctx; } /******************************************************************************* @@ -1534,7 +1530,7 @@ static const grpc_event_engine_vtable vtable = { .workqueue_ref = workqueue_ref, .workqueue_unref = workqueue_unref, - .workqueue_enqueue = workqueue_enqueue, + .workqueue_scheduler = workqueue_scheduler, .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index ab139895fd..2975d619e1 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -275,9 +275,8 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } #endif -void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { - g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error); +grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { + return g_event_engine->workqueue_scheduler(workqueue); } #endif // GRPC_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index cb5832539d..1068a4bad5 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -106,8 +106,7 @@ typedef struct grpc_event_engine_vtable { grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue); void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif - void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error); + grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue); } grpc_event_engine_vtable; void grpc_event_engine_init(void); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 604713e578..c243bc803f 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -57,7 +57,6 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { return true; } -#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); @@ -76,30 +75,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { } } GPR_ASSERT(exec_ctx->active_combiner == NULL); - if (exec_ctx->stealing_from_workqueue != NULL) { - if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { - grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue, - exec_ctx->stolen_closure, - exec_ctx->stolen_closure->error_data.error); - GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue, - "exec_ctx_sched"); - exec_ctx->stealing_from_workqueue = NULL; - exec_ctx->stolen_closure = NULL; - } else { - grpc_closure *c = exec_ctx->stolen_closure; - GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue, - "exec_ctx_sched"); - exec_ctx->stealing_from_workqueue = NULL; - exec_ctx->stolen_closure = NULL; - grpc_error *error = c->error_data.error; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0); - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0); - grpc_exec_ctx_flush(exec_ctx); - did_something = true; - } - } GPR_TIMER_END("grpc_exec_ctx_flush", 0); return did_something; } @@ -109,104 +84,21 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null) { - GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0); - if (offload_target_or_null == NULL) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - } else if (exec_ctx->stealing_from_workqueue == NULL) { - exec_ctx->stealing_from_workqueue = offload_target_or_null; - closure->error_data.error = error; - exec_ctx->stolen_closure = closure; - } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) { - grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); - GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); - } else { /* stealing_from_workqueue == offload_target_or_null */ - grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, - exec_ctx->stolen_closure, - exec_ctx->stolen_closure->error_data.error); - closure->error_data.error = error; - exec_ctx->stolen_closure = closure; - GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); - } - GPR_TIMER_END("grpc_exec_ctx_sched", 0); +static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + closure->cb(exec_ctx, closure->cb_arg, error); + GRPC_ERROR_UNREF(error); } -void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list, - grpc_workqueue *offload_target_or_null) { - grpc_closure_list_move(list, &exec_ctx->closure_list); +static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } void grpc_exec_ctx_global_init(void) {} void grpc_exec_ctx_global_shutdown(void) {} -#else -static gpr_mu g_mu; -static gpr_cv g_cv; -static int g_threads = 0; - -static void run_closure(void *arg) { - grpc_closure *closure = arg; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0); - grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(&g_mu); - if (--g_threads == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); -} - -static void start_closure(grpc_closure *closure) { - gpr_thd_id id; - gpr_mu_lock(&g_mu); - g_threads++; - gpr_mu_unlock(&g_mu); - gpr_thd_new(&id, run_closure, closure, NULL); -} - -bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; } - -void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {} -void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - bool success, - grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - if (closure == NULL) return; - closure->final_data = success; - start_closure(closure); -} - -void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list, - grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - if (list == NULL) return; - grpc_closure *p = list->head; - while (p) { - grpc_closure *start = p; - p = grpc_closure_next(start); - start_closure(start); - } - grpc_closure_list r = GRPC_CLOSURE_LIST_INIT; - *list = r; -} - -void grpc_exec_ctx_global_init(void) { - gpr_mu_init(&g_mu); - gpr_cv_init(&g_cv); -} - -void grpc_exec_ctx_global_shutdown(void) { - gpr_mu_lock(&g_mu); - while (g_threads != 0) { - gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&g_mu); - - gpr_mu_destroy(&g_mu); - gpr_cv_destroy(&g_cv); -} -#endif +static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { + exec_ctx_run, exec_ctx_sched}; +static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; +grpc_closure_scheduler *grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 7e50cb9825..e566f1b3e8 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -66,17 +66,6 @@ typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER struct grpc_exec_ctx { grpc_closure_list closure_list; - /** The workqueue we're stealing work from. - As items are queued to the execution context, we try to steal one - workqueue item and execute it inline (assuming the exec_ctx is not - finished) - doing so does not invalidate the workqueue's contract, and - provides a small latency win in cases where we get a hit */ - grpc_workqueue *stealing_from_workqueue; - /** The workqueue item that was stolen from the workqueue above. When new - items are scheduled to be offloaded to that workqueue, we need to update - this like a 1-deep fifo to maintain the invariant that workqueue items - queued by one thread are started in order */ - grpc_closure *stolen_closure; /** currently active combiner: updated only via combiner.c */ grpc_combiner *active_combiner; /** last active combiner in the active combiner list */ @@ -89,10 +78,7 @@ struct grpc_exec_ctx { /* initializer for grpc_exec_ctx: prefer to use GRPC_EXEC_CTX_INIT whenever possible */ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ - { \ - GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \ - finish_check \ - } + { GRPC_CLOSURE_LIST_INIT, NULL, NULL, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { bool cached_ready_to_finish; @@ -108,6 +94,8 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INIT \ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL) +extern grpc_closure_scheduler *grpc_schedule_on_exec_ctx; + /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. * Returns true if work was performed, false otherwise. */ @@ -115,14 +103,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); -/** Add a closure to be executed in the future. - If \a offload_target_or_null is NULL, the closure will be executed at the - next exec_ctx.{finish,flush} point. - If \a offload_target_or_null is non-NULL, the closure will be scheduled - against the workqueue, and a reference to the workqueue will be consumed. */ -void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null); /** Returns true if we'd like to leave this execution context as soon as possible: useful for deciding whether to do something more or not depending on outside context */ @@ -131,11 +111,6 @@ bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx); bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); /** A finish check that is always ready to finish */ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); -/** Add a list of closures to be executed at the next flush/finish point. - * Leaves \a list empty. */ -void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list, - grpc_workqueue *offload_target_or_null); void grpc_exec_ctx_global_init(void); diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 8d7535d6fe..37a7142792 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -77,7 +77,7 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } else { - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); + grpc_closure_list_sched(&exec_ctx, &g_executor.closures); } gpr_mu_unlock(&g_executor.mu); grpc_exec_ctx_flush(&exec_ctx); @@ -112,7 +112,8 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_push(grpc_closure *closure, grpc_error *error) { +static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { grpc_closure_list_append(&g_executor.closures, closure, error); @@ -133,7 +134,7 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); + grpc_closure_list_sched(&exec_ctx, &g_executor.closures); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { @@ -141,3 +142,8 @@ void grpc_executor_shutdown() { } gpr_mu_destroy(&g_executor.mu); } + +static const grpc_closure_scheduler_vtable executor_vtable = {executor_push, + executor_push}; +static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; +grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index da9dcd07d0..53f3b6d441 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -43,9 +43,7 @@ * non-blocking solution available. */ void grpc_executor_init(); -/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate - * thread */ -void grpc_executor_push(grpc_closure *closure, grpc_error *error); +extern grpc_closure_scheduler *grpc_executor_scheduler; /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 3a74b842b6..ed3edeee94 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -83,7 +83,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, // Drain any pending UV callbacks without blocking uv_run(uv_default_loop(), UV_RUN_NOWAIT); } - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } void grpc_pollset_destroy(grpc_pollset *pollset) { diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 5540303e49..6714d8d51d 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } else { pollset->on_shutdown = closure; } @@ -167,7 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, + grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, NULL); pollset->on_shutdown = NULL; } diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 821932e562..50e470d149 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -163,10 +163,9 @@ typedef struct { static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { request *r = rp; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, r->on_done, - grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out), - NULL); + grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out)); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); @@ -185,12 +184,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_closure *on_done, grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); - grpc_closure_init(&r->request_closure, do_request_thread, r); + grpc_closure_init(&r->request_closure, do_request_thread, r, + grpc_executor_scheduler); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; r->addrs_out = addrs; - grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE); } void (*grpc_resolve_address)( diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 3269c4f09f..9b5f3209f0 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -98,7 +98,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; error = handle_addrinfo_result(status, res, r->addresses); - grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL); + grpc_closure_sched(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); @@ -193,7 +193,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, int s; err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + grpc_closure_sched(exec_ctx, on_done, err); return; } r = gpr_malloc(sizeof(request)); @@ -217,7 +217,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, *addrs = NULL; err = GRPC_ERROR_CREATE("getaddrinfo failed"); err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s)); - grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + grpc_closure_sched(exec_ctx, on_done, err); gpr_free(r); gpr_free(req); gpr_free(hints); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index fada5ecbe8..bda7f77f9c 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, } else { GRPC_ERROR_REF(error); } - grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL); + grpc_closure_sched(exec_ctx, r->on_done, error); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); @@ -173,7 +173,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_closure *on_done, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); - grpc_closure_init(&r->request_closure, do_request_thread, r); + grpc_closure_init(&r->request_closure, do_request_thread, r, grpc_schedule_on_exec_ctx); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 213d29600c..8db539edfb 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -265,9 +265,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, if (resource_quota->step_scheduled) return; resource_quota->step_scheduled = true; grpc_resource_quota_internal_ref(resource_quota); - grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner, - &resource_quota->rq_step_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure, + GRPC_ERROR_NONE); } /* returns true if all allocations are completed */ @@ -294,7 +293,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; - grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); + grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); @@ -439,7 +438,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, resource_user->new_reclaimers[destructive] = NULL; GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED); return false; } resource_user->reclaimers[destructive] = closure; @@ -480,10 +479,10 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); @@ -496,10 +495,10 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; rq_step_sched(exec_ctx, resource_user->resource_quota); @@ -571,9 +570,12 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota); + grpc_closure_init( + &resource_quota->rq_step_closure, rq_step, resource_quota, + grpc_combiner_finally_scheduler(resource_quota->combiner, true)); grpc_closure_init(&resource_quota->rq_reclamation_done_closure, - rq_reclamation_done, resource_quota); + rq_reclamation_done, resource_quota, + grpc_combiner_scheduler(resource_quota->combiner, false)); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_quota->roots[i] = NULL; } @@ -614,9 +616,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_internal_ref(resource_quota); a->size = (int64_t)size; - grpc_closure_init(&a->closure, rq_resize, a); - grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure, - GRPC_ERROR_NONE, false); + grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); + grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -663,15 +664,19 @@ grpc_resource_user *grpc_resource_user_create( resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, - resource_user); + resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->add_to_free_pool_closure, - &ru_add_to_free_pool, resource_user); + &ru_add_to_free_pool, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->post_reclaimer_closure[0], - &ru_post_benign_reclaimer, resource_user); + &ru_post_benign_reclaimer, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->post_reclaimer_closure[1], - &ru_post_destructive_reclaimer, resource_user); - grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, - resource_user); + &ru_post_destructive_reclaimer, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); gpr_mu_init(&resource_user->mu); gpr_atm_rel_store(&resource_user->refs, 1); gpr_atm_rel_store(&resource_user->shutdown, 0); @@ -706,9 +711,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx, gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); GPR_ASSERT(old >= amount); if (old == amount) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->destroy_closure, GRPC_ERROR_NONE, - false); + grpc_closure_sched(exec_ctx, &resource_user->destroy_closure, + GRPC_ERROR_NONE); } } @@ -724,9 +728,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - grpc_closure_create(ru_shutdown, resource_user), - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, + grpc_closure_create( + ru_shutdown, resource_user, + grpc_combiner_scheduler( + resource_user->resource_quota->combiner, false)), + GRPC_ERROR_NONE); } } @@ -746,12 +753,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); if (!resource_user->allocating) { resource_user->allocating = true; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->allocate_closure, GRPC_ERROR_NONE, - false); + grpc_closure_sched(exec_ctx, &resource_user->allocate_closure, + GRPC_ERROR_NONE); } } else { - grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); } @@ -770,9 +776,8 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, if (is_bigger_than_zero && was_zero_or_negative && !resource_user->added_to_free_pool) { resource_user->added_to_free_pool = true; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->add_to_free_pool_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure, + GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); ru_unref_by(exec_ctx, resource_user, (gpr_atm)size); @@ -784,9 +789,9 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL); resource_user->new_reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, + &resource_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE); } void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, @@ -795,18 +800,20 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } - grpc_combiner_execute( - exec_ctx, resource_user->resource_quota->combiner, - &resource_user->resource_quota->rq_reclamation_done_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure, + GRPC_ERROR_NONE); } void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { - grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices, - slice_allocator); - grpc_closure_init(&slice_allocator->on_done, cb, p); + grpc_closure_init( + &slice_allocator->on_allocated, ru_allocated_slices, slice_allocator, + grpc_combiner_scheduler(resource_user->resource_quota->combiner, false)); + grpc_closure_init( + &slice_allocator->on_done, cb, p, + grpc_combiner_scheduler(resource_user->resource_quota->combiner, false)); slice_allocator->resource_user = resource_user; } diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 54911e0e31..2f2e02f715 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -131,7 +131,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } else { info->closure = closure; } @@ -154,7 +154,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index a3a70a8ed7..be7b695ad6 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -265,7 +265,7 @@ finish: grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); + grpc_closure_sched(exec_ctx, closure, error); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -294,7 +294,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); + grpc_closure_sched(exec_ctx, closure, error); return; } if (dsmode == GRPC_DSMODE_IPV4) { @@ -303,7 +303,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr = &addr4_copy; } if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); + grpc_closure_sched(exec_ctx, closure, error); return; } @@ -321,14 +321,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), - NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect")); goto done; } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index b07f9ceffa..b1664b85fd 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -110,7 +110,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { if (done) { uv_tcp_connect_cleanup(&exec_ctx, connect); } - grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); + grpc_closure_sched(&exec_ctx, closure, error); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 1127588ebc..692252bbe0 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -129,7 +129,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect_unlock_and_cleanup(exec_ctx, ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); + grpc_closure_sched(exec_ctx, on_done, error); } /* Tries to issue one async connection, then schedules both an IOCP @@ -227,7 +227,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; ac->resource_quota = resource_quota; - grpc_closure_init(&ac->on_connect, on_connect, ac); + grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); @@ -247,7 +247,7 @@ failure: closesocket(sock); } grpc_resource_quota_internal_unref(exec_ctx, resource_quota); - grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); + grpc_closure_sched(exec_ctx, on_done, final_error); } #endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 540305e4fa..1000776d61 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -316,7 +316,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); } } @@ -460,11 +460,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_sched(exec_ctx, cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) - : GRPC_ERROR_NONE, - NULL); + grpc_closure_sched(exec_ctx, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) + : GRPC_ERROR_NONE); return; } tcp->outgoing_buffer = buf; @@ -484,7 +483,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_log(GPR_DEBUG, "write: %s", str); grpc_error_free_string(str); } - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_sched(exec_ctx, cb, error); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 179f47ef76..e7eae19cf3 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -760,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL); + grpc_closure_list_sched(exec_ctx, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(exec_ctx, s); } diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index e1a174cfa2..89624b447c 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -126,7 +126,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } while (s->head) { @@ -170,7 +170,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL); + grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting, NULL); if (exec_ctx == NULL) { grpc_exec_ctx_flush(&local_exec_ctx); tcp_server_destroy(&local_exec_ctx, s); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index b0c8586bac..2a54949354 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -162,10 +162,10 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s), + grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s), GRPC_ERROR_NONE, NULL); } @@ -204,7 +204,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL); + grpc_closure_list_sched(exec_ctx, &s->shutdown_starting, NULL); gpr_mu_unlock(&s->mu); tcp_server_destroy(exec_ctx, s); } @@ -465,7 +465,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->new_socket = INVALID_SOCKET; sp->port = port; sp->port_index = port_index; - grpc_closure_init(&sp->on_accept, on_accept, sp); + grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); *listener = sp; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 6e2ad1dbe9..f97ca885b4 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -169,7 +169,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // nread < 0: Error error = GRPC_ERROR_CREATE("TCP Read failed"); } - grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); + grpc_closure_sched(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -190,7 +190,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, error = GRPC_ERROR_CREATE("TCP Read failed at start"); error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_sched(exec_ctx, cb, error); } if (grpc_tcp_trace) { const char *str = grpc_error_string(error); @@ -217,7 +217,7 @@ static void write_callback(uv_write_t *req, int status) { gpr_free(tcp->write_buffers); grpc_resource_user_free(&exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); - grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); + grpc_closure_sched(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -243,7 +243,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } if (tcp->shutting_down) { - grpc_exec_ctx_sched(exec_ctx, cb, + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -254,7 +254,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->write_slices->count == 0) { // No slices means we don't have to do anything, // and libuv doesn't like empty writes - grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); return; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index d4613b674e..b84a448a81 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -188,7 +188,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { tcp->read_cb = NULL; TCP_UNREF(exec_ctx, tcp, "read"); - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_sched(exec_ctx, cb, error); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -202,7 +202,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_sched(exec_ctx, cb, + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -227,7 +227,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE); return; } @@ -240,7 +240,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, + grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } @@ -272,7 +272,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } TCP_UNREF(exec_ctx, tcp, "write"); - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_sched(exec_ctx, cb, error); } /* Initiates a write. */ @@ -290,7 +290,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_sched(exec_ctx, cb, + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -322,7 +322,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_sched(exec_ctx, cb, error); if (allocated) gpr_free(allocated); return; } @@ -340,7 +340,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(exec_ctx, tcp, "write"); - grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), + grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; } @@ -424,8 +424,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, tcp->socket = socket; gpr_mu_init(&tcp->mu); gpr_ref_init(&tcp->refcount, 1); - grpc_closure_init(&tcp->on_read, on_read, tcp); - grpc_closure_init(&tcp->on_write, on_write, tcp); + grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx); + grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx); tcp->peer_string = gpr_strdup(peer_string); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 00058f9d86..ecd3b284dc 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -184,22 +184,22 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, shard_type *shard = &g_shards[shard_idx(timer)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); - grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); + grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg, + grpc_schedule_on_exec_ctx); timer->deadline = deadline; timer->triggered = 0; if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, &timer->closure, - GRPC_ERROR_CREATE("Attempt to create timer before initialization"), - NULL); + GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE); return; } @@ -251,7 +251,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -317,7 +317,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error)); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index cfcb89268b..7153535a85 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -55,7 +55,7 @@ void run_expired_timer(uv_timer_t *handle) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(!timer->triggered); timer->triggered = 1; - grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); } @@ -65,10 +65,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, void *timer_cb_arg, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; - grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); + grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg, grpc_schedule_on_exec_ctx); if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE); return; } timer->triggered = 0; @@ -83,7 +83,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { if (!timer->triggered) { timer->triggered = 1; - grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } } diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 3c24ea9afa..69812e2804 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -126,7 +126,7 @@ grpc_udp_server *grpc_udp_server_create(void) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 73d9849843..371b0f55dc 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -72,17 +72,16 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Add a work item to a workqueue. Items added to a work queue will be started - in approximately the order they were enqueued, on some thread that may or - may not be the current thread. Successive closures enqueued onto a workqueue - MAY be executed concurrently. +/** Fetch the workqueue closure scheduler. Items added to a work queue will be + started in approximately the order they were enqueued, on some thread that + may or may not be the current thread. Successive closures enqueued onto a + workqueue MAY be executed concurrently. It is generally more expensive to add a closure to a workqueue than to the execution context, both in terms of CPU work and in execution latency. Use work queues when it's important that other threads be given a chance to tackle some workload. */ -void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error); +grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c index e58ca476cc..4d61b40912 100644 --- a/src/core/lib/iomgr/workqueue_uv.c +++ b/src/core/lib/iomgr/workqueue_uv.c @@ -58,9 +58,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif -void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { + return grpc_schedule_on_exec_ctx; } #endif /* GPR_UV */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 5c93d3c59e..234b47cdf5 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -56,9 +56,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif -void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { + return grpc_schedule_on_exec_ctx; } #endif /* GPR_WINDOWS */ diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c index ea4cb76fb9..1cf142fa9a 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.c +++ b/src/core/lib/security/credentials/fake/fake_credentials.c @@ -113,9 +113,10 @@ static void md_only_test_get_request_metadata( if (c->is_async) { grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); - grpc_executor_push( - grpc_closure_create(on_simulated_token_fetch_done, cb_arg), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, + grpc_closure_create(on_simulated_token_fetch_done, + cb_arg, grpc_executor_scheduler), + GRPC_ERROR_NONE); } else { cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index afe0e3d357..caf57c856b 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -130,7 +130,8 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_get( &exec_ctx, &context, &detector.pollent, resource_quota, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), - grpc_closure_create(on_compute_engine_detection_http_response, &detector), + grpc_closure_create(on_compute_engine_detection_http_response, &detector, + grpc_schedule_on_exec_ctx), &detector.response); grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); @@ -155,7 +156,8 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_destroy(&context); grpc_closure_init(&destroy_closure, destroy_pollset, - grpc_polling_entity_pollset(&detector.pollent)); + grpc_polling_entity_pollset(&detector.pollent), + grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&detector.pollent), &destroy_closure); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 03097a57c0..8c75098612 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -677,7 +677,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - grpc_closure_create(on_keys_retrieved, ctx), + grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_internal_unref(exec_ctx, resource_quota); grpc_json_destroy(json); @@ -778,7 +778,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, *(path_prefix++) = '\0'; gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss); } - http_cb = grpc_closure_create(on_keys_retrieved, ctx); + http_cb = + grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx); rsp_idx = HTTP_RESPONSE_KEYS; } else { req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss); @@ -790,7 +791,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, gpr_asprintf(&req.http.path, "/%s%s", path_prefix, GRPC_OPENID_CONFIG_URL_SUFFIX); } - http_cb = grpc_closure_create(on_openid_config_retrieved, ctx); + http_cb = grpc_closure_create(on_openid_config_retrieved, ctx, + grpc_schedule_on_exec_ctx); rsp_idx = HTTP_RESPONSE_OPENID; } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index b3625b22c0..9aa7863977 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -312,9 +312,10 @@ static void compute_engine_fetch_oauth2( extreme memory pressure. */ grpc_resource_quota *resource_quota = grpc_resource_quota_create("oauth2_credentials"); - grpc_httpcli_get(exec_ctx, httpcli_context, pollent, resource_quota, &request, - deadline, grpc_closure_create(response_cb, metadata_req), - &metadata_req->response); + grpc_httpcli_get( + exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline, + grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx), + &metadata_req->response); grpc_resource_quota_internal_unref(exec_ctx, resource_quota); } @@ -368,10 +369,11 @@ static void refresh_token_fetch_oauth2( extreme memory pressure. */ grpc_resource_quota *resource_quota = grpc_resource_quota_create("oauth2_credentials_refresh"); - grpc_httpcli_post(exec_ctx, httpcli_context, pollent, resource_quota, - &request, body, strlen(body), deadline, - grpc_closure_create(response_cb, metadata_req), - &metadata_req->response); + grpc_httpcli_post( + exec_ctx, httpcli_context, pollent, resource_quota, &request, body, + strlen(body), deadline, + grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx), + &metadata_req->response); grpc_resource_quota_internal_unref(exec_ctx, resource_quota); gpr_free(body); } diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 331a8f1835..750c3675b1 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -146,7 +146,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL); + grpc_closure_sched(exec_ctx, ep->read_cb, error); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } @@ -329,10 +329,9 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ grpc_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, cb, - grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result), - NULL); + grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result)); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } @@ -417,7 +416,7 @@ grpc_endpoint *grpc_secure_endpoint_create( grpc_slice_buffer_init(&ep->output_buffer); grpc_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; - grpc_closure_init(&ep->on_read, on_read, ep); + grpc_closure_init(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 5b088aa58d..17ad681c82 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -134,9 +134,9 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked) { if (sc == NULL) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, on_peer_checked, - GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL); + GRPC_ERROR_CREATE("cannot check peer -- no security connector")); tsi_peer_destruct(&peer); } else { sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); @@ -273,7 +273,7 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: - grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); + grpc_closure_sched(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -508,7 +508,7 @@ static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx, ? c->overridden_target_name : c->target_name, &peer, auth_context); - grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); + grpc_closure_sched(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -518,7 +518,7 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, grpc_closure *on_peer_checked) { grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context); tsi_peer_destruct(&peer); - grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); + grpc_closure_sched(exec_ctx, on_peer_checked, error); } static void add_shallow_auth_property_to_peer(tsi_peer *peer, diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 41a775db85..748bf4a432 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -136,7 +136,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, h->shutdown = true; } // Invoke callback. - grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL); + grpc_closure_sched(exec_ctx, h->on_handshake_done, error); } static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, @@ -173,7 +173,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); grpc_channel_args_destroy(tmp_args); // Invoke callback. - grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; @@ -392,10 +392,13 @@ static grpc_handshaker *security_handshaker_create( h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); grpc_closure_init(&h->on_handshake_data_sent_to_peer, - on_handshake_data_sent_to_peer, h); + on_handshake_data_sent_to_peer, h, + grpc_schedule_on_exec_ctx); grpc_closure_init(&h->on_handshake_data_received_from_peer, - on_handshake_data_received_from_peer, h); - grpc_closure_init(&h->on_peer_checked, on_peer_checked, h); + on_handshake_data_received_from_peer, h, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&h->on_peer_checked, on_peer_checked, h, + grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&h->left_overs); grpc_slice_buffer_init(&h->outgoing); return &h->base; @@ -418,9 +421,8 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_tcp_server_acceptor *acceptor, grpc_closure *on_handshake_done, grpc_handshaker_args *args) { - grpc_exec_ctx_sched(exec_ctx, on_handshake_done, - GRPC_ERROR_CREATE("Failed to create security handshaker"), - NULL); + grpc_closure_sched(exec_ctx, on_handshake_done, + GRPC_ERROR_CREATE("Failed to create security handshaker")); } static const grpc_handshaker_vtable fail_handshaker_vtable = { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index e6a242e68f..5b4adc4661 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -132,7 +132,7 @@ static void on_md_processing_done( grpc_metadata_batch_filter(calld->recv_initial_metadata, remove_consumed_md, elem); grpc_metadata_array_destroy(&calld->md); - grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); } else { grpc_slice message; grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op)); @@ -148,13 +148,13 @@ static void on_md_processing_done( calld->transport_op->send_message = NULL; } calld->transport_op->send_trailing_metadata = NULL; - close_op->on_complete = grpc_closure_create(destroy_op, close_op); + close_op->on_complete = + grpc_closure_create(destroy_op, close_op, grpc_schedule_on_exec_ctx); grpc_transport_stream_op_add_close(close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, close_op); - grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status), - NULL); + grpc_closure_sched(&exec_ctx, calld->on_done_recv, + grpc_error_set_int(GRPC_ERROR_CREATE(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status)); } grpc_exec_ctx_finish(&exec_ctx); @@ -174,8 +174,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, return; } } - grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), - NULL); + grpc_closure_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, @@ -214,7 +213,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem); + grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem, + grpc_schedule_on_exec_ctx); if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) { args->context[GRPC_CONTEXT_SECURITY].destroy( diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8ca3cab9d5..b20801005a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -794,7 +794,8 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -804,7 +805,8 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.close_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -814,13 +816,13 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); if (tc->type == TC_CANCEL) { - grpc_closure_init(&tc->closure, send_cancel, tc); + grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); } else if (tc->type == TC_CLOSE) { - grpc_closure_init(&tc->closure, send_close, tc); + grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "close"); } - grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); return GRPC_CALL_OK; } @@ -1138,8 +1140,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, + grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } } @@ -1251,9 +1253,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, + grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); + grpc_closure_sched(exec_ctx, saved_rsr_closure, error); } gpr_mu_unlock(&call->mu); @@ -1558,7 +1561,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, - receiving_initial_metadata_ready, bctl); + receiving_initial_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); bctl->recv_initial_metadata = 1; stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1581,7 +1585,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_buffer = op->data.recv_message; stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, - bctl); + bctl, grpc_schedule_on_exec_ctx); stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; @@ -1646,7 +1650,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); stream_op->context = call->context; - grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); + grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 0d2f01a649..e68febdddf 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -71,7 +71,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GPR_ASSERT(reserved == NULL); pr->tag = tag; pr->cq = cq; - grpc_closure_init(&pr->closure, ping_done, pr); + grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 184c1a1a16..aefdd39547 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -168,7 +168,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { #ifndef NDEBUG cc->outstanding_tag_count = 0; #endif - grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc); + grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create", 0); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 57da94ac1e..f1ad13711a 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -98,16 +98,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_connectivity_state_change, + GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel"), NULL); + grpc_closure_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 62d7afc8da..d143ad9607 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -278,7 +278,8 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int send_goaway, grpc_error *send_disconnect) { struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc, + grpc_schedule_on_exec_ctx); grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; @@ -346,9 +347,9 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } } @@ -545,8 +546,9 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error); return; } @@ -590,9 +592,9 @@ static void finish_start_new_rpc( gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -607,7 +609,8 @@ static void finish_start_new_rpc( memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, elem); + grpc_closure_init(&calld->publish, publish_new_rpc, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -813,9 +816,10 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -851,7 +855,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; op.data.recv_initial_metadata = &calld->initial_metadata; - grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem); + grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, &calld->got_initial_metadata); } @@ -887,7 +892,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&calld->mu_state); grpc_closure_init(&calld->server_on_recv_initial_metadata, - server_on_recv_initial_metadata, elem); + server_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; @@ -926,7 +932,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; grpc_closure_init(&chand->channel_connectivity_changed, - channel_connectivity_changed, chand); + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -1278,7 +1285,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - grpc_closure_init(&l->destroy_done, listener_destroy_done, server); + grpc_closure_init(&l->destroy_done, listener_destroy_done, server, + grpc_schedule_on_exec_ctx); l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } @@ -1384,9 +1392,10 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 4f49d7cf7d..c656d93740 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -81,7 +81,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); } - grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL); + grpc_closure_sched(exec_ctx, w->notify, error); gpr_free(w); } GRPC_ERROR_UNREF(tracker->current_error); @@ -121,7 +121,7 @@ bool grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED); tracker->watchers = w->next; gpr_free(w); return false; @@ -129,7 +129,7 @@ bool grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED); w->next = w->next->next; gpr_free(rm_candidate); return false; @@ -140,8 +140,8 @@ bool grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_sched(exec_ctx, notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + grpc_closure_sched(exec_ctx, notify, + GRPC_ERROR_REF(tracker->current_error)); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -191,8 +191,8 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); } - grpc_exec_ctx_sched(exec_ctx, w->notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + grpc_closure_sched(exec_ctx, w->notify, + GRPC_ERROR_REF(tracker->current_error)); gpr_free(w); } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index b448126da8..0d24062c1e 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -68,7 +68,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } } @@ -82,7 +82,7 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_iomgr_cb_func cb, void *cb_arg) { #endif gpr_ref_init(&refcount->refs, initial_refs); - grpc_closure_init(&refcount->destroy, cb, cb_arg); + grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); } static void move64(uint64_t *from, uint64_t *to) { @@ -168,11 +168,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { - grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), - NULL); - grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); + grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, op->on_complete, error); } typedef struct { @@ -196,7 +195,8 @@ static void add_error(grpc_transport_stream_op *op, grpc_error **which, cmd = gpr_malloc(sizeof(*cmd)); cmd->error = error; cmd->then_call = op->on_complete; - grpc_closure_init(&cmd->closure, free_message, cmd); + grpc_closure_init(&cmd->closure, free_message, cmd, + grpc_schedule_on_exec_ctx); op->on_complete = &cmd->closure; *which = error; } @@ -269,14 +269,14 @@ typedef struct { static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_op *op = arg; - grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), - NULL); + grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { made_transport_op *op = gpr_malloc(sizeof(*op)); - grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op); + grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op, + grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; memset(&op->op, 0, sizeof(op->op)); op->op.on_consumed = &op->outer_on_complete; @@ -292,8 +292,7 @@ typedef struct { static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_stream_op *op = arg; - grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), - NULL); + grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } @@ -301,7 +300,7 @@ grpc_transport_stream_op *grpc_make_transport_stream_op( grpc_closure *on_complete) { made_transport_stream_op *op = gpr_malloc(sizeof(*op)); grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op, - op); + op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; memset(&op->op, 0, sizeof(op->op)); op->op.on_complete = &op->outer_on_complete; |