aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-01-03 09:49:07 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-01-03 09:49:07 -0800
commit0b09341461451caaee6bc65bcc41c4a829bfa065 (patch)
tree5db2bc41669d37d330e3076c82c678e3e297124c /src/core/lib
parent36d374681ae254a4269fec3aa7e5dc7dbb43a768 (diff)
parentaef521c6f9df8d36ae927a4504d055e1d376bfa6 (diff)
Merge branch 'cleanup_closures' into slice_with_exec_ctx_and_new_closures
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_stack.c9
-rw-r--r--src/core/lib/channel/compress_filter.c6
-rw-r--r--src/core/lib/channel/deadline_filter.c11
-rw-r--r--src/core/lib/channel/handshaker.c8
-rw-r--r--src/core/lib/channel/http_client_filter.c15
-rw-r--r--src/core/lib/channel/http_server_filter.c9
-rw-r--r--src/core/lib/channel/message_size_filter.c5
-rw-r--r--src/core/lib/http/httpcli.c18
-rw-r--r--src/core/lib/http/httpcli_security_connector.c2
-rw-r--r--src/core/lib/iomgr/closure.c38
-rw-r--r--src/core/lib/iomgr/closure.h39
-rw-r--r--src/core/lib/iomgr/combiner.c110
-rw-r--r--src/core/lib/iomgr/combiner.h14
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c37
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c26
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h3
-rw-r--r--src/core/lib/iomgr/exec_ctx.c134
-rw-r--r--src/core/lib/iomgr/exec_ctx.h31
-rw-r--r--src/core/lib/iomgr/executor.c33
-rw-r--r--src/core/lib/iomgr/executor.h4
-rw-r--r--src/core/lib/iomgr/pollset_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_windows.c5
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c10
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c7
-rw-r--r--src/core/lib/iomgr/resource_quota.c101
-rw-r--r--src/core/lib/iomgr/socket_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c15
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c2
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c21
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c11
-rw-r--r--src/core/lib/iomgr/tcp_uv.c12
-rw-r--r--src/core/lib/iomgr/tcp_windows.c27
-rw-r--r--src/core/lib/iomgr/timer_generic.c14
-rw-r--r--src/core/lib/iomgr/timer_uv.c9
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/iomgr/workqueue.h11
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c5
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c5
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c7
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c6
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c8
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c17
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c9
-rw-r--r--src/core/lib/security/transport/security_connector.c10
-rw-r--r--src/core/lib/security/transport/security_handshaker.c18
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c18
-rw-r--r--src/core/lib/surface/call.c29
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c3
-rw-r--r--src/core/lib/surface/lame_client.c10
-rw-r--r--src/core/lib/surface/server.c53
-rw-r--r--src/core/lib/transport/connectivity_state.c14
-rw-r--r--src/core/lib/transport/transport.c27
58 files changed, 585 insertions, 506 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 8093141741..8f08b427fb 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -297,7 +297,8 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
@@ -307,7 +308,8 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status,
optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
@@ -319,7 +321,8 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index dbeb423994..337c194b79 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -273,8 +273,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index b7579b0b14..8dd6d099e1 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -124,7 +124,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op* op) {
deadline_state->next_on_complete = op->on_complete;
- grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &deadline_state->on_complete;
}
@@ -173,8 +174,9 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
- grpc_closure_init(&state->closure, start_timer_after_init, state);
- grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_init(&state->closure, start_timer_after_init, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE);
}
}
@@ -291,7 +293,8 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
calld->recv_initial_metadata = op->recv_initial_metadata;
grpc_closure_init(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, elem);
+ recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 23edc826ca..ff827527b3 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -165,7 +165,7 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
- grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error);
mgr->shutdown = true;
} else {
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
@@ -218,8 +218,10 @@ void grpc_handshake_manager_do_handshake(
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
mgr->acceptor = acceptor;
- grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
- grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
+ grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 1a87a6a961..d154450988 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -345,12 +345,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem);
+ hc_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem);
- grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ hc_on_recv_trailing_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index 1bb57f8f32..f508231238 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -329,9 +329,12 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
- grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
- grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&calld->read_slice_buffer);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 43bbb5e704..862090b371 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -128,7 +128,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_free(message_string);
}
// Invoke the next callback.
- grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
+ grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error);
}
// Start transport stream op.
@@ -164,7 +164,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
- grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 625b83697c..fb2108987b 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -104,7 +104,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != NULL) {
grpc_resolved_addresses_destroy(req->addresses);
@@ -225,7 +225,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
return;
}
addr = &req->addresses->addrs[req->next_address++];
- grpc_closure_init(&req->connected, on_connected, req);
+ grpc_closure_init(&req->connected, on_connected, req,
+ grpc_schedule_on_exec_ctx);
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
@@ -267,8 +268,9 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
req->pollent = pollent;
req->overall_error = GRPC_ERROR_NONE;
req->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_init(&req->on_read, on_read, req);
- grpc_closure_init(&req->done_write, done_write, req);
+ grpc_closure_init(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&req->done_write, done_write, req,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&req->incoming);
grpc_slice_buffer_init(&req->outgoing);
grpc_iomgr_register_object(&req->iomgr_obj, name);
@@ -278,9 +280,11 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pollent);
grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
- req->context->pollset_set,
- grpc_closure_create(on_resolved, req), &req->addresses);
+ grpc_resolve_address(
+ exec_ctx, request->host, req->handshaker->default_port,
+ req->context->pollset_set,
+ grpc_closure_create(on_resolved, req, grpc_schedule_on_exec_ctx),
+ &req->addresses);
}
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 7dd50b7dab..440817c5a6 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -98,7 +98,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
}
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index c6ddc76732..da0ec878a3 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -37,10 +37,13 @@
#include "src/core/lib/profiling/timers.h"
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg) {
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
closure->cb = cb;
closure->cb_arg = cb_arg;
+ closure->scheduler = scheduler;
+ return closure;
}
void grpc_closure_list_init(grpc_closure_list *closure_list) {
@@ -105,11 +108,12 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
cb(exec_ctx, cb_arg, error);
}
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
wrapped_closure *wc = gpr_malloc(sizeof(*wc));
wc->cb = cb;
wc->cb_arg = cb_arg;
- grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
+ grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
return &wc->wrapper;
}
@@ -117,8 +121,30 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
GPR_TIMER_BEGIN("grpc_closure_run", 0);
if (c != NULL) {
- c->cb(exec_ctx, c->cb_arg, error);
+ c->scheduler->vtable->run(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
}
- GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_closure_run", 0);
}
+
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_closure_sched", 0);
+ if (c != NULL) {
+ c->scheduler->vtable->sched(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ GPR_TIMER_END("grpc_closure_sched", 0);
+}
+
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ grpc_closure *c = list->head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ list->head = list->tail = NULL;
+}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index ec0114a2a4..ee386fbc76 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -57,6 +57,22 @@ typedef struct grpc_closure_list {
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
+typedef struct grpc_closure_scheduler grpc_closure_scheduler;
+
+typedef struct grpc_closure_scheduler_vtable {
+ /* NOTE: for all these functions, closure->scheduler == the scheduler that was
+ used to find this vtable */
+ void (*run)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+ void (*sched)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+} grpc_closure_scheduler_vtable;
+
+/** Abstract type that can schedule closures for execution */
+struct grpc_closure_scheduler {
+ const grpc_closure_scheduler_vtable *vtable;
+};
+
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@@ -73,6 +89,10 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
+ /** Scheduler to schedule against: NULL to schedule against current execution
+ context */
+ grpc_closure_scheduler *scheduler;
+
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error *error;
@@ -80,12 +100,14 @@ struct grpc_closure {
} error_data;
};
-/** Initializes \a closure with \a cb and \a cb_arg. */
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg);
+/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler);
/* Create a heap allocated closure: try to avoid except for very rare events */
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
@@ -113,4 +135,13 @@ bool grpc_closure_list_empty(grpc_closure_list list);
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+/** Schedule a closure to be run. Does not need to be run from a safe point. */
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+
+/** Schedule all closures in a list to be run. Does not need to be run from a
+ * safe point. */
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *closure_list);
+
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index cfc67020ae..c26a73b2b7 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -56,6 +56,10 @@ int grpc_combiner_trace = 0;
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
+ grpc_closure_scheduler uncovered_scheduler;
+ grpc_closure_scheduler covered_scheduler;
+ grpc_closure_scheduler uncovered_finally_scheduler;
+ grpc_closure_scheduler covered_finally_scheduler;
gpr_mpscq queue;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
@@ -70,6 +74,26 @@ struct grpc_combiner {
grpc_closure offload;
};
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+
+static const grpc_closure_scheduler_vtable scheduler_uncovered = {
+ combiner_exec_uncovered, combiner_exec_uncovered};
+static const grpc_closure_scheduler_vtable scheduler_covered = {
+ combiner_exec_covered, combiner_exec_covered};
+static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
+ combiner_finally_exec_uncovered, combiner_finally_exec_uncovered};
+static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
+ combiner_finally_exec_covered, combiner_finally_exec_covered};
+
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
typedef struct {
@@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
lock->final_list_covered_by_poller = false;
+ lock->uncovered_scheduler.vtable = &scheduler_uncovered;
+ lock->covered_scheduler.vtable = &scheduler_covered;
+ lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
+ lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock);
+ grpc_closure_init(&lock->offload, offload, lock,
+ grpc_workqueue_scheduler(lock->optional_workqueue));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(
@@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
GPR_TIMER_END("combiner.execute", 0);
}
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
+ error, false);
+}
+
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
+ error, true);
+}
+
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
lock->optional_workqueue));
- grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
- grpc_error *error) {
- grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
+ grpc_error *error);
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock, grpc_closure *closure,
+ grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock, false)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
+
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
+ cl, uncovered_finally_scheduler),
+ cl, error, false);
+}
+
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl, grpc_error *error) {
+ combiner_execute_finally(
+ exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
+ cl, error, true);
+}
+
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
+ bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_scheduler
+ : &combiner->uncovered_scheduler;
+}
+
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(
+ grpc_combiner *combiner, bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_finally_scheduler
+ : &combiner->uncovered_finally_scheduler;
+}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index d04eeed83a..81dff85d40 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -50,14 +50,12 @@
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-// Execute \a action within the lock.
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
-// Execute \a action within the lock just prior to unlocking.
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
+// Fetch a scheduler to schedule closures against
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
+// Scheduler to execute \a action within the lock just prior to unlocking.
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 59b1fe896f..d6664aead2 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -201,6 +201,8 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
+ grpc_closure_scheduler workqueue_scheduler;
+
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -304,6 +306,8 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -316,6 +320,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
+static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
+ workqueue_enqueue, workqueue_enqueue};
+
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -528,6 +535,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
+ pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
@@ -799,10 +807,10 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
@@ -819,6 +827,12 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("workqueue.enqueue", 0);
}
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ polling_island *pi = (polling_island *)workqueue;
+ return workqueue == NULL ? grpc_schedule_on_exec_ctx
+ : &pi->workqueue_scheduler;
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1029,8 +1043,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1056,16 +1069,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -1087,7 +1098,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -1358,7 +1369,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
@@ -1409,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
@@ -1958,7 +1971,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 21b28e5554..5bc5621443 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -397,7 +397,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
if (!fd->released) {
close(fd->fd);
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -457,16 +457,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -489,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -852,7 +850,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
@@ -901,7 +899,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
goto done;
}
/* If we're shutting down then we don't execute any extended work */
@@ -1081,7 +1079,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1100,7 +1098,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
}
if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
@@ -1288,10 +1286,8 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {}
#endif
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
- grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
/*******************************************************************************
@@ -1534,7 +1530,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index ab139895fd..2975d619e1 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -275,9 +275,8 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return g_event_engine->workqueue_scheduler(workqueue);
}
#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index cb5832539d..1068a4bad5 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -106,8 +106,7 @@ typedef struct grpc_event_engine_vtable {
grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
- void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+ grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 604713e578..6aa788f8e5 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -57,7 +57,6 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
return true;
}
-#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
@@ -67,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
did_something = true;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
@@ -76,30 +77,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
}
GPR_ASSERT(exec_ctx->active_combiner == NULL);
- if (exec_ctx->stealing_from_workqueue != NULL) {
- if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
- grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- } else {
- grpc_closure *c = exec_ctx->stolen_closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- grpc_error *error = c->error_data.error;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
- grpc_exec_ctx_flush(exec_ctx);
- did_something = true;
- }
- }
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
return did_something;
}
@@ -109,104 +86,21 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
grpc_exec_ctx_flush(exec_ctx);
}
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null) {
- GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
- if (offload_target_or_null == NULL) {
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else if (exec_ctx->stealing_from_workqueue == NULL) {
- exec_ctx->stealing_from_workqueue = offload_target_or_null;
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- } else { /* stealing_from_workqueue == offload_target_or_null */
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- }
- GPR_TIMER_END("grpc_exec_ctx_sched", 0);
+static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ closure->cb(exec_ctx, closure->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
}
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- grpc_closure_list_move(list, &exec_ctx->closure_list);
+static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_global_init(void) {}
void grpc_exec_ctx_global_shutdown(void) {}
-#else
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static int g_threads = 0;
-
-static void run_closure(void *arg) {
- grpc_closure *closure = arg;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0);
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_mu_lock(&g_mu);
- if (--g_threads == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-static void start_closure(grpc_closure *closure) {
- gpr_thd_id id;
- gpr_mu_lock(&g_mu);
- g_threads++;
- gpr_mu_unlock(&g_mu);
- gpr_thd_new(&id, run_closure, closure, NULL);
-}
-
-bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; }
-
-void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {}
-void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- bool success,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (closure == NULL) return;
- closure->final_data = success;
- start_closure(closure);
-}
-
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (list == NULL) return;
- grpc_closure *p = list->head;
- while (p) {
- grpc_closure *start = p;
- p = grpc_closure_next(start);
- start_closure(start);
- }
- grpc_closure_list r = GRPC_CLOSURE_LIST_INIT;
- *list = r;
-}
-
-void grpc_exec_ctx_global_init(void) {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
-}
-
-void grpc_exec_ctx_global_shutdown(void) {
- gpr_mu_lock(&g_mu);
- while (g_threads != 0) {
- gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
- }
- gpr_mu_unlock(&g_mu);
-
- gpr_mu_destroy(&g_mu);
- gpr_cv_destroy(&g_cv);
-}
-#endif
+static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
+ exec_ctx_run, exec_ctx_sched};
+static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
+grpc_closure_scheduler *grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 7e50cb9825..e566f1b3e8 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,17 +66,6 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
- /** The workqueue we're stealing work from.
- As items are queued to the execution context, we try to steal one
- workqueue item and execute it inline (assuming the exec_ctx is not
- finished) - doing so does not invalidate the workqueue's contract, and
- provides a small latency win in cases where we get a hit */
- grpc_workqueue *stealing_from_workqueue;
- /** The workqueue item that was stolen from the workqueue above. When new
- items are scheduled to be offloaded to that workqueue, we need to update
- this like a 1-deep fifo to maintain the invariant that workqueue items
- queued by one thread are started in order */
- grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
/** last active combiner in the active combiner list */
@@ -89,10 +78,7 @@ struct grpc_exec_ctx {
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { \
- GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
- finish_check \
- }
+ { GRPC_CLOSURE_LIST_INIT, NULL, NULL, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
@@ -108,6 +94,8 @@ struct grpc_exec_ctx {
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
+extern grpc_closure_scheduler *grpc_schedule_on_exec_ctx;
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -115,14 +103,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed in the future.
- If \a offload_target_or_null is NULL, the closure will be executed at the
- next exec_ctx.{finish,flush} point.
- If \a offload_target_or_null is non-NULL, the closure will be scheduled
- against the workqueue, and a reference to the workqueue will be consumed. */
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null);
/** Returns true if we'd like to leave this execution context as soon as
possible: useful for deciding whether to do something more or not depending
on outside context */
@@ -131,11 +111,6 @@ bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** A finish check that is always ready to finish */
bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
-/** Add a list of closures to be executed at the next flush/finish point.
- * Leaves \a list empty. */
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_global_init(void);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 4e78b619fb..852775564f 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ gpr_mu_unlock(&g_executor.mu);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(&exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -112,7 +120,8 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_append(&g_executor.closures, closure, error);
@@ -132,10 +141,24 @@ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_exec_ctx_enqueue_list(exec_ctx, &g_executor.closures, NULL);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
gpr_thd_join(g_executor.tid);
}
gpr_mu_destroy(&g_executor.mu);
}
+
+static const grpc_closure_scheduler_vtable executor_vtable = {executor_push,
+ executor_push};
+static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
+grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 7bf8f21940..1213016383 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -43,9 +43,7 @@
* non-blocking solution available. */
void grpc_executor_init();
-/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
- * thread */
-void grpc_executor_push(grpc_closure *closure, grpc_error *error);
+extern grpc_closure_scheduler *grpc_executor_scheduler;
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 3a74b842b6..ed3edeee94 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -83,7 +83,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
}
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 5540303e49..2a45e708df 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@@ -167,8 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 821932e562..50e470d149 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -163,10 +163,9 @@ typedef struct {
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
grpc_error *error) {
request *r = rp;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, r->on_done,
- grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out),
- NULL);
+ grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -185,12 +184,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 3269c4f09f..9b5f3209f0 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -98,7 +98,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
error = handle_addrinfo_result(status, res, r->addresses);
- grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
@@ -193,7 +193,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
int s;
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
return;
}
r = gpr_malloc(sizeof(request));
@@ -217,7 +217,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
*addrs = NULL;
err = GRPC_ERROR_CREATE("getaddrinfo failed");
err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s));
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
gpr_free(r);
gpr_free(req);
gpr_free(hints);
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index fada5ecbe8..2439ce3cb7 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
} else {
GRPC_ERROR_REF(error);
}
- grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -173,12 +173,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index c5c536d2b4..42a044df77 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -265,9 +265,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_ref_internal(resource_quota);
- grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
- &resource_quota->rq_step_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure,
+ GRPC_ERROR_NONE);
}
/* returns true if all allocations are completed */
@@ -294,7 +293,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
- grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
+ grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
@@ -428,7 +427,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
resource_user->new_reclaimers[destructive] = NULL;
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@@ -469,10 +468,10 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
@@ -485,10 +484,10 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(exec_ctx, resource_user->resource_quota);
@@ -560,9 +559,12 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota);
+ grpc_closure_init(
+ &resource_quota->rq_step_closure, rq_step, resource_quota,
+ grpc_combiner_finally_scheduler(resource_quota->combiner, true));
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
- rq_reclamation_done, resource_quota);
+ rq_reclamation_done, resource_quota,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
@@ -603,9 +605,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
rq_resize_args *a = gpr_malloc(sizeof(*a));
a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
- grpc_closure_init(&a->closure, rq_resize, a);
- grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -654,15 +655,19 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->resource_quota =
grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
- resource_user);
+ resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->add_to_free_pool_closure,
- &ru_add_to_free_pool, resource_user);
+ &ru_add_to_free_pool, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
- &ru_post_benign_reclaimer, resource_user);
+ &ru_post_benign_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
- &ru_post_destructive_reclaimer, resource_user);
- grpc_closure_init(&resource_user->destroy_closure, &ru_destroy,
- resource_user);
+ &ru_post_destructive_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@@ -697,9 +702,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx,
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->destroy_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->destroy_closure,
+ GRPC_ERROR_NONE);
}
}
@@ -715,9 +719,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- grpc_closure_create(ru_shutdown, resource_user),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(
+ ru_shutdown, resource_user,
+ grpc_combiner_scheduler(
+ resource_user->resource_quota->combiner, false)),
+ GRPC_ERROR_NONE);
}
}
@@ -737,12 +744,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
if (!resource_user->allocating) {
resource_user->allocating = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->allocate_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->allocate_closure,
+ GRPC_ERROR_NONE);
}
} else {
- grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
}
@@ -761,9 +767,8 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->add_to_free_pool_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure,
+ GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(exec_ctx, resource_user, (gpr_atm)size);
@@ -775,9 +780,9 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
resource_user->new_reclaimers[destructive] = closure;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->post_reclaimer_closure[destructive],
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ &resource_user->post_reclaimer_closure[destructive],
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
@@ -786,18 +791,20 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
- grpc_combiner_execute(
- exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->resource_quota->rq_reclamation_done_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure,
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
- slice_allocator);
- grpc_closure_init(&slice_allocator->on_done, cb, p);
+ grpc_closure_init(
+ &slice_allocator->on_allocated, ru_allocated_slices, slice_allocator,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
+ grpc_closure_init(
+ &slice_allocator->on_done, cb, p,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
slice_allocator->resource_user = resource_user;
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 54911e0e31..2f2e02f715 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -131,7 +131,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@@ -154,7 +154,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
- grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 2db56b47ee..c8237dc38f 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -265,7 +265,7 @@ finish:
grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -294,7 +294,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
@@ -303,7 +303,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
@@ -321,14 +321,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep =
grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
@@ -343,8 +342,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
- ac->write_closure.cb = on_writable;
- ac->write_closure.cb_arg = ac;
+ grpc_closure_init(&ac->write_closure, on_writable, ac,
+ grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 5f2ccfee76..ed0de50fc1 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -110,7 +110,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
if (done) {
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
- grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
+ grpc_closure_sched(&exec_ctx, closure, error);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index cc10060b9b..275258ebb5 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -129,7 +129,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -227,7 +227,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
- grpc_closure_init(&ac->on_connect, on_connect, ac);
+ grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
@@ -247,7 +247,7 @@ failure:
closesocket(sock);
}
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
- grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
+ grpc_closure_sched(exec_ctx, on_done, final_error);
}
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2b46544f82..ece44978b0 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -320,7 +320,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
}
}
@@ -464,11 +464,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb,
- grpc_fd_is_shutdown(tcp->em_fd)
- ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
- : GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE);
return;
}
tcp->outgoing_buffer = buf;
@@ -488,7 +487,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_log(GPR_DEBUG, "write: %s", str);
grpc_error_free_string(str);
}
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
GPR_TIMER_END("tcp_write", 0);
@@ -556,10 +555,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = tcp_handle_read;
- tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = tcp_handle_write;
- tcp->write_closure.cb_arg = tcp;
+ grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 1e63142e0e..20efb678b2 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -254,8 +254,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
}
@@ -723,8 +723,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -733,8 +733,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -760,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 4ccc80c9f0..eed2773f8a 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -126,7 +126,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
while (s->head) {
@@ -170,7 +170,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 053bb35f6f..97d7827461 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -162,11 +162,12 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
@@ -204,7 +205,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
@@ -465,7 +466,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->new_socket = INVALID_SOCKET;
sp->port = port;
sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp);
+ grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
*listener = sp;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index f3510bc780..7328f94fe0 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -169,7 +169,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// nread < 0: Error
error = GRPC_ERROR_CREATE("TCP Read failed");
}
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -190,7 +190,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
error = GRPC_ERROR_CREATE("TCP Read failed at start");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
if (grpc_tcp_trace) {
const char *str = grpc_error_string(error);
@@ -217,7 +217,7 @@ static void write_callback(uv_write_t *req, int status) {
gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -243,8 +243,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
return;
}
@@ -254,7 +254,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 8b3b44aaaa..84f791ba07 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -189,7 +189,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
tcp->read_cb = NULL;
TCP_UNREF(exec_ctx, tcp, "read");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -203,8 +203,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -228,7 +228,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -241,8 +241,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read,
- GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read,
+ GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
@@ -273,7 +273,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
/* Initiates a write. */
@@ -291,8 +291,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -323,7 +323,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -341,8 +341,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
- NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
@@ -425,8 +424,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
- grpc_closure_init(&tcp->on_read, on_read, tcp);
- grpc_closure_init(&tcp->on_write, on_write, tcp);
+ grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 00058f9d86..ecd3b284dc 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -184,22 +184,22 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
shard_type *shard = &g_shards[shard_idx(timer)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
+ grpc_schedule_on_exec_ctx);
timer->deadline = deadline;
timer->triggered = 0;
if (!g_initialized) {
timer->triggered = 1;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &timer->closure,
- GRPC_ERROR_CREATE("Attempt to create timer before initialization"),
- NULL);
+ GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
return;
}
@@ -251,7 +251,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -317,7 +317,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index cfcb89268b..00b835ffb8 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -55,7 +55,7 @@ void run_expired_timer(uv_timer_t *handle) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
- grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -65,10 +65,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void *timer_cb_arg, gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
+ grpc_schedule_on_exec_ctx);
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
return;
}
timer->triggered = 0;
@@ -83,7 +84,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 3c24ea9afa..dfbd295c91 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -126,7 +126,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -170,8 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
@@ -446,8 +446,8 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 73d9849843..371b0f55dc 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -72,17 +72,16 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Add a work item to a workqueue. Items added to a work queue will be started
- in approximately the order they were enqueued, on some thread that may or
- may not be the current thread. Successive closures enqueued onto a workqueue
- MAY be executed concurrently.
+/** Fetch the workqueue closure scheduler. Items added to a work queue will be
+ started in approximately the order they were enqueued, on some thread that
+ may or may not be the current thread. Successive closures enqueued onto a
+ workqueue MAY be executed concurrently.
It is generally more expensive to add a closure to a workqueue than to the
execution context, both in terms of CPU work and in execution latency.
Use work queues when it's important that other threads be given a chance to
tackle some workload. */
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
index e58ca476cc..4d61b40912 100644
--- a/src/core/lib/iomgr/workqueue_uv.c
+++ b/src/core/lib/iomgr/workqueue_uv.c
@@ -58,9 +58,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 5c93d3c59e..234b47cdf5 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -56,9 +56,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index f41184f9f9..a8679d097d 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -116,9 +116,10 @@ static void md_only_test_get_request_metadata(
if (c->is_async) {
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- grpc_executor_push(
- grpc_closure_create(on_simulated_token_fetch_done, cb_arg),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(on_simulated_token_fetch_done,
+ cb_arg, grpc_executor_scheduler),
+ GRPC_ERROR_NONE);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL);
}
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index 4afeb4a3d2..d6e1fe3dcf 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -130,7 +130,8 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_httpcli_get(
exec_ctx, &context, &detector.pollent, resource_quota, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
- grpc_closure_create(on_compute_engine_detection_http_response, &detector),
+ grpc_closure_create(on_compute_engine_detection_http_response, &detector,
+ grpc_schedule_on_exec_ctx),
&detector.response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@@ -155,7 +156,8 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_httpcli_context_destroy(&context);
grpc_closure_init(&destroy_closure, destroy_pollset,
- grpc_polling_entity_pollset(&detector.pollent));
+ grpc_polling_entity_pollset(&detector.pollent),
+ grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(exec_ctx,
grpc_polling_entity_pollset(&detector.pollent),
&destroy_closure);
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index 044c89c54d..2270be8f44 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -685,7 +685,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- grpc_closure_create(on_keys_retrieved, ctx),
+ grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_json_destroy(json);
@@ -787,7 +787,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
*(path_prefix++) = '\0';
gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss);
}
- http_cb = grpc_closure_create(on_keys_retrieved, ctx);
+ http_cb =
+ grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_KEYS;
} else {
req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss);
@@ -799,7 +800,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&req.http.path, "/%s%s", path_prefix,
GRPC_OPENID_CONFIG_URL_SUFFIX);
}
- http_cb = grpc_closure_create(on_openid_config_retrieved, ctx);
+ http_cb = grpc_closure_create(on_openid_config_retrieved, ctx,
+ grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_OPENID;
}
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index cbcd74958c..1b0e43a1e4 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -313,9 +313,11 @@ static void compute_engine_fetch_oauth2(
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("oauth2_credentials");
- grpc_httpcli_get(exec_ctx, httpcli_context, pollent, resource_quota, &request,
- deadline, grpc_closure_create(response_cb, metadata_req),
- &metadata_req->response);
+ grpc_httpcli_get(
+ exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline,
+ grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+
+ &metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
@@ -370,10 +372,11 @@ static void refresh_token_fetch_oauth2(
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("oauth2_credentials_refresh");
- grpc_httpcli_post(exec_ctx, httpcli_context, pollent, resource_quota,
- &request, body, strlen(body), deadline,
- grpc_closure_create(response_cb, metadata_req),
- &metadata_req->response);
+ grpc_httpcli_post(
+ exec_ctx, httpcli_context, pollent, resource_quota, &request, body,
+ strlen(body), deadline,
+ grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+ &metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
gpr_free(body);
}
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 2906621a19..18a7a6f7e7 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -147,7 +147,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
}
}
ep->read_buffer = NULL;
- grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL);
+ grpc_closure_sched(exec_ctx, ep->read_cb, error);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
@@ -330,10 +330,9 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer);
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, cb,
- grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result),
- NULL);
+ grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result));
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
return;
}
@@ -418,7 +417,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
grpc_slice_buffer_init(&ep->output_buffer);
grpc_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_closure_init(&ep->on_read, on_read, ep);
+ grpc_closure_init(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index c0fdb6e0d6..5aa26e0577 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -134,9 +134,9 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_auth_context **auth_context,
grpc_closure *on_peer_checked) {
if (sc == NULL) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_peer_checked,
- GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL);
+ GRPC_ERROR_CREATE("cannot check peer -- no security connector"));
tsi_peer_destruct(&peer);
} else {
sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
@@ -279,7 +279,7 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -516,7 +516,7 @@ static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx,
? c->overridden_target_name
: c->target_name,
&peer, auth_context);
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -526,7 +526,7 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
grpc_closure *on_peer_checked) {
grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context);
tsi_peer_destruct(&peer);
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
}
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 7c2f63d1cc..e886cc59a0 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -138,7 +138,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
h->shutdown = true;
}
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, h->on_handshake_done, error);
}
static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -175,7 +175,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
grpc_channel_args_destroy(exec_ctx, tmp_args);
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
@@ -394,10 +394,13 @@ static grpc_handshaker *security_handshaker_create(
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
grpc_closure_init(&h->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, h);
+ on_handshake_data_sent_to_peer, h,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&h->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, h);
- grpc_closure_init(&h->on_peer_checked, on_peer_checked, h);
+ on_handshake_data_received_from_peer, h,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&h->on_peer_checked, on_peer_checked, h,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&h->left_overs);
grpc_slice_buffer_init(&h->outgoing);
return &h->base;
@@ -420,9 +423,8 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_tcp_server_acceptor *acceptor,
grpc_closure *on_handshake_done,
grpc_handshaker_args *args) {
- grpc_exec_ctx_sched(exec_ctx, on_handshake_done,
- GRPC_ERROR_CREATE("Failed to create security handshaker"),
- NULL);
+ grpc_closure_sched(exec_ctx, on_handshake_done,
+ GRPC_ERROR_CREATE("Failed to create security handshaker"));
}
static const grpc_handshaker_vtable fail_handshaker_vtable = {
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index c1a5cba64a..5e98ba895d 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -133,7 +133,7 @@ static void on_md_processing_done(
grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata,
remove_consumed_md, elem);
grpc_metadata_array_destroy(&calld->md);
- grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
} else {
grpc_slice message;
grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op));
@@ -149,13 +149,13 @@ static void on_md_processing_done(
calld->transport_op->send_message = NULL;
}
calld->transport_op->send_trailing_metadata = NULL;
- close_op->on_complete = grpc_closure_create(destroy_op, close_op);
+ close_op->on_complete =
+ grpc_closure_create(destroy_op, close_op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_close(&exec_ctx, close_op, status, &message);
grpc_call_next_op(&exec_ctx, elem, close_op);
- grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv,
- grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status),
- NULL);
+ grpc_closure_sched(&exec_ctx, calld->on_done_recv,
+ grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status));
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -175,8 +175,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
return;
}
}
- grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
@@ -215,7 +214,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
+ grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
args->context[GRPC_CONTEXT_SECURITY].destroy(
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 5c985003e5..899e8fab3f 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -793,7 +793,8 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
memset(&tc->op, 0, sizeof(tc->op));
tc->op.cancel_error = tc->error;
/* reuse closure to catch completion */
- grpc_closure_init(&tc->closure, done_termination, tc);
+ grpc_closure_init(&tc->closure, done_termination, tc,
+ grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -803,7 +804,8 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
memset(&tc->op, 0, sizeof(tc->op));
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
- grpc_closure_init(&tc->closure, done_termination, tc);
+ grpc_closure_init(&tc->closure, done_termination, tc,
+ grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -814,13 +816,13 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
tc->error);
if (tc->type == TC_CANCEL) {
- grpc_closure_init(&tc->closure, send_cancel, tc);
+ grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "cancel");
} else if (tc->type == TC_CLOSE) {
- grpc_closure_init(&tc->closure, send_close, tc);
+ grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "close");
}
- grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE);
return GRPC_CALL_OK;
}
@@ -1142,8 +1144,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx,
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
- bctl);
+ grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl,
+ grpc_schedule_on_exec_ctx);
continue_receiving_slices(exec_ctx, bctl);
}
}
@@ -1255,9 +1257,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
call->has_initial_md_been_received = true;
if (call->saved_receiving_stream_ready_bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
+ receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
+ grpc_schedule_on_exec_ctx);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL);
+ grpc_closure_sched(exec_ctx, saved_rsr_closure, error);
}
gpr_mu_unlock(&call->mu);
@@ -1564,7 +1567,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
- receiving_initial_metadata_ready, bctl);
+ receiving_initial_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
bctl->recv_initial_metadata = 1;
stream_op->recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1587,7 +1591,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_buffer = op->data.recv_message;
stream_op->recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
- bctl);
+ bctl, grpc_schedule_on_exec_ctx);
stream_op->recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
@@ -1652,7 +1656,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
stream_op->context = call->context;
- grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
+ grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 0d2f01a649..e68febdddf 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -71,7 +71,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GPR_ASSERT(reserved == NULL);
pr->tag = tag;
pr->cq = cq;
- grpc_closure_init(&pr->closure, ping_done, pr);
+ grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 184c1a1a16..4613c9021e 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -168,7 +168,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
- grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
+ grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
+ grpc_schedule_on_exec_ctx);
GPR_TIMER_END("grpc_completion_queue_create", 0);
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 2d56211716..ae1eac09a9 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -99,16 +99,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
*op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_connectivity_state_change,
+ GRPC_ERROR_NONE);
}
if (op->send_ping != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE("lame client channel"), NULL);
+ grpc_closure_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 332098dc1f..addb7c4fbc 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -279,7 +279,8 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) {
struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc,
+ grpc_schedule_on_exec_ctx);
grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
@@ -347,9 +348,9 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE,
- NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
}
}
@@ -441,8 +442,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
orphan_channel(chand);
server_ref(chand->server);
maybe_finish_shutdown(exec_ctx, chand->server);
- chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
- chand->finish_destroy_channel_closure.cb_arg = chand;
+ grpc_closure_init(&chand->finish_destroy_channel_closure,
+ finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
if (grpc_server_channel_trace && error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(error);
@@ -546,8 +547,9 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
return;
}
@@ -591,9 +593,9 @@ static void finish_start_new_rpc(
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
return;
}
@@ -608,7 +610,8 @@ static void finish_start_new_rpc(
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message = &calld->payload;
- grpc_closure_init(&calld->publish, publish_new_rpc, elem);
+ grpc_closure_init(&calld->publish, publish_new_rpc, elem,
+ grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
&calld->publish);
break;
@@ -816,9 +819,10 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_NONE);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -854,7 +858,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata = &calld->initial_metadata;
- grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem);
+ grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
&calld->got_initial_metadata);
}
@@ -890,7 +895,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&calld->mu_state);
grpc_closure_init(&calld->server_on_recv_initial_metadata,
- server_on_recv_initial_metadata, elem);
+ server_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
server_ref(chand->server);
return GRPC_ERROR_NONE;
@@ -929,7 +935,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_closure_init(&chand->channel_connectivity_changed,
- channel_connectivity_changed, chand);
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
@@ -1281,7 +1288,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
- grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
+ grpc_closure_init(&l->destroy_done, listener_destroy_done, server,
+ grpc_schedule_on_exec_ctx);
l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
}
@@ -1387,9 +1395,10 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_NONE);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 4f49d7cf7d..c656d93740 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -81,7 +81,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
} else {
error = GRPC_ERROR_CREATE("Shutdown connectivity owner");
}
- grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL);
+ grpc_closure_sched(exec_ctx, w->notify, error);
gpr_free(w);
}
GRPC_ERROR_UNREF(tracker->current_error);
@@ -121,7 +121,7 @@ bool grpc_connectivity_state_notify_on_state_change(
if (current == NULL) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == notify) {
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
tracker->watchers = w->next;
gpr_free(w);
return false;
@@ -129,7 +129,7 @@ bool grpc_connectivity_state_notify_on_state_change(
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
w->next = w->next->next;
gpr_free(rm_candidate);
return false;
@@ -140,8 +140,8 @@ bool grpc_connectivity_state_notify_on_state_change(
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_exec_ctx_sched(exec_ctx, notify,
- GRPC_ERROR_REF(tracker->current_error), NULL);
+ grpc_closure_sched(exec_ctx, notify,
+ GRPC_ERROR_REF(tracker->current_error));
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -191,8 +191,8 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
w->notify);
}
- grpc_exec_ctx_sched(exec_ctx, w->notify,
- GRPC_ERROR_REF(tracker->current_error), NULL);
+ grpc_closure_sched(exec_ctx, w->notify,
+ GRPC_ERROR_REF(tracker->current_error));
gpr_free(w);
}
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 9bc278c133..055edbb39f 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -69,7 +69,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -83,7 +83,7 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg) {
#endif
gpr_ref_init(&refcount->refs, initial_refs);
- grpc_closure_init(&refcount->destroy, cb, cb_arg);
+ grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
}
static void move64(uint64_t *from, uint64_t *to) {
@@ -169,11 +169,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error),
- NULL);
- grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_REF(error), NULL);
- grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL);
+ grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->on_complete, error);
}
typedef struct {
@@ -197,7 +196,8 @@ static void add_error(grpc_transport_stream_op *op, grpc_error **which,
cmd = gpr_malloc(sizeof(*cmd));
cmd->error = error;
cmd->then_call = op->on_complete;
- grpc_closure_init(&cmd->closure, free_message, cmd);
+ grpc_closure_init(&cmd->closure, free_message, cmd,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &cmd->closure;
*which = error;
}
@@ -271,14 +271,14 @@ typedef struct {
static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
made_transport_op *op = gpr_malloc(sizeof(*op));
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op,
+ grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_consumed = &op->outer_on_complete;
@@ -294,8 +294,7 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
@@ -303,7 +302,7 @@ grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_malloc(sizeof(*op));
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
- op);
+ op, grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_complete = &op->outer_on_complete;