aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-01-04 10:17:31 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-01-04 10:17:31 -0800
commit17f6904baded526404e1bb4c023ab1eb91054813 (patch)
tree09f4edd558cf7094a60fe1970c94377304f09fc5 /src
parent00a2d18234269e901deb7da22063c7fda210d7ee (diff)
parent33dfad1fbc14f9e0157b31e3bc33eda1a0cd4e1c (diff)
Merge branch 'slice_with_exec_ctx_and_new_closures' into metadata_filter_and_new_closures
Diffstat (limited to 'src')
-rw-r--r--src/compiler/python_generator.cc3
-rw-r--r--src/core/ext/census/grpc_filter.c3
-rw-r--r--src/core/ext/client_channel/channel_connectivity.c3
-rw-r--r--src/core/ext/client_channel/client_channel.c50
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c8
-rw-r--r--src/core/ext/client_channel/subchannel.c13
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c41
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c28
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c22
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c3
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c13
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c4
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c10
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c189
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h4
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c43
-rw-r--r--src/core/lib/channel/channel_stack.c9
-rw-r--r--src/core/lib/channel/compress_filter.c6
-rw-r--r--src/core/lib/channel/deadline_filter.c11
-rw-r--r--src/core/lib/channel/handshaker.c8
-rw-r--r--src/core/lib/channel/http_client_filter.c15
-rw-r--r--src/core/lib/channel/http_server_filter.c9
-rw-r--r--src/core/lib/channel/message_size_filter.c5
-rw-r--r--src/core/lib/http/httpcli.c18
-rw-r--r--src/core/lib/http/httpcli_security_connector.c2
-rw-r--r--src/core/lib/iomgr/closure.c38
-rw-r--r--src/core/lib/iomgr/closure.h39
-rw-r--r--src/core/lib/iomgr/combiner.c110
-rw-r--r--src/core/lib/iomgr/combiner.h14
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c37
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c26
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h3
-rw-r--r--src/core/lib/iomgr/exec_ctx.c134
-rw-r--r--src/core/lib/iomgr/exec_ctx.h31
-rw-r--r--src/core/lib/iomgr/executor.c33
-rw-r--r--src/core/lib/iomgr/executor.h4
-rw-r--r--src/core/lib/iomgr/pollset_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_windows.c5
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c10
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c7
-rw-r--r--src/core/lib/iomgr/resource_quota.c101
-rw-r--r--src/core/lib/iomgr/socket_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c15
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c2
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c21
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c11
-rw-r--r--src/core/lib/iomgr/tcp_uv.c12
-rw-r--r--src/core/lib/iomgr/tcp_windows.c27
-rw-r--r--src/core/lib/iomgr/timer_generic.c14
-rw-r--r--src/core/lib/iomgr/timer_uv.c9
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/iomgr/workqueue.h11
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c5
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c5
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c7
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c6
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c46
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.h5
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c17
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c9
-rw-r--r--src/core/lib/security/transport/security_connector.c10
-rw-r--r--src/core/lib/security/transport/security_handshaker.c18
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c18
-rw-r--r--src/core/lib/support/string.c12
-rw-r--r--src/core/lib/support/string.h2
-rw-r--r--src/core/lib/surface/call.c29
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c3
-rw-r--r--src/core/lib/surface/lame_client.c10
-rw-r--r--src/core/lib/surface/server.c53
-rw-r--r--src/core/lib/surface/validate_metadata.c5
-rw-r--r--src/core/lib/transport/connectivity_state.c14
-rw-r--r--src/core/lib/transport/transport.c27
-rw-r--r--src/php/lib/Grpc/AbstractCall.php11
-rw-r--r--src/php/lib/Grpc/BaseStub.php54
-rw-r--r--src/php/lib/Grpc/BidiStreamingCall.php15
-rw-r--r--src/php/lib/Grpc/ClientStreamingCall.php11
-rw-r--r--src/php/lib/Grpc/ServerStreamingCall.php15
-rw-r--r--src/php/lib/Grpc/UnaryCall.php11
-rw-r--r--src/python/grpcio/commands.py1
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py (renamed from src/python/grpcio_tests/tests/interop/_insecure_interop_test.py)6
-rw-r--r--src/python/grpcio_tests/tests/interop/_intraop_test_case.py (renamed from src/python/grpcio_tests/tests/interop/_interop_test_case.py)2
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_intraop_test.py (renamed from src/python/grpcio_tests/tests/interop/_secure_interop_test.py)6
-rw-r--r--src/python/grpcio_tests/tests/tests.json4
91 files changed, 928 insertions, 810 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index 0fac1b88cd..4841da8da8 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -724,6 +724,9 @@ pair<bool, grpc::string> PrivateGenerator::GetGrpcServices() {
out = &out_printer;
if (generate_in_pb2_grpc) {
+ out->Print(
+ "# Generated by the gRPC Python protocol compiler plugin. "
+ "DO NOT EDIT!\n");
if (!PrintPreamble()) {
return make_pair(false, "");
}
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 75b0dc1ec3..65cfe1fa90 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -152,7 +152,8 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
memset(d, 0, sizeof(*d));
d->start_ts = args->start_time;
/* TODO(hongyu): call census_tracing_start_op here. */
- grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
+ grpc_closure_init(&d->finish_recv, server_on_done_recv, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c
index 9797e66564..b10f444b63 100644
--- a/src/core/ext/client_channel/channel_connectivity.c
+++ b/src/core/ext/client_channel/channel_connectivity.c
@@ -198,7 +198,8 @@ void grpc_channel_watch_connectivity_state(
grpc_cq_begin_op(cq, tag);
gpr_mu_init(&w->mu);
- grpc_closure_init(&w->on_complete, watch_complete, w);
+ grpc_closure_init(&w->on_complete, watch_complete, w,
+ grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 16de03d003..74350d9fee 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -254,7 +254,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
- grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w,
+ grpc_schedule_on_exec_ctx);
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
@@ -366,14 +367,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
chand->method_params_table = method_params_table;
if (lb_policy != NULL) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
} else if (chand->resolver == NULL /* disconnected */) {
grpc_closure_list_fail_all(
&chand->waiting_for_config_closures,
GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -430,7 +429,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -449,9 +448,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE("Ping with no load balancing"),
- NULL);
+ grpc_closure_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"));
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -470,8 +468,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (!chand->started_resolving) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
- grpc_exec_ctx_enqueue_list(exec_ctx,
- &chand->waiting_for_config_closures, NULL);
+ grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
@@ -516,7 +513,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&chand->mu);
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
- on_resolver_result_changed, chand);
+ on_resolver_result_changed, chand,
+ grpc_schedule_on_exec_ctx);
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
@@ -683,8 +681,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
@@ -766,14 +765,14 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
} else {
call_data *calld = cpa->elem->call_data;
gpr_mu_lock(&calld->mu);
if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags, cpa->connected_subchannel,
cpa->on_ready, GRPC_ERROR_NONE)) {
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&calld->mu);
}
@@ -805,9 +804,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
}
}
gpr_mu_unlock(&chand->mu);
@@ -858,12 +857,12 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
- grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_init(&cpa->closure, continue_picking, cpa,
+ grpc_schedule_on_exec_ctx);
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
} else {
- grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
- NULL);
+ grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"));
}
gpr_mu_unlock(&chand->mu);
@@ -948,7 +947,8 @@ retry:
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
- grpc_closure_init(&calld->next_step, subchannel_ready, elem);
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem,
+ grpc_schedule_on_exec_ctx);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
@@ -1094,7 +1094,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
// get the service config data once the resolver returns.
// Take a reference to the call stack to be owned by the callback.
GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
- grpc_closure_init(&calld->read_service_config, read_service_config, elem);
+ grpc_closure_init(&calld->read_service_config, read_service_config, elem,
+ grpc_schedule_on_exec_ctx);
grpc_closure_list_append(&chand->waiting_for_config_closures,
&calld->read_service_config, GRPC_ERROR_NONE);
gpr_mu_unlock(&chand->mu);
@@ -1207,7 +1208,8 @@ void grpc_client_channel_watch_connectivity_state(
w->pollset = pollset;
w->on_complete = on_complete;
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
gpr_mu_lock(&chand->mu);
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index 5ead2ff73f..27b117af84 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -133,7 +133,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
handshaker->shutdown = true;
}
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error);
}
// Callback invoked when finished writing HTTP CONNECT request.
@@ -232,7 +232,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
goto done;
}
// Success. Invoke handshake-done callback.
- grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error);
done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
@@ -316,9 +316,9 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
handshaker->proxy_server = gpr_strdup(proxy_server);
grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done,
- handshaker);
+ handshaker, grpc_schedule_on_exec_ctx);
grpc_closure_init(&handshaker->response_read_closure, on_read_done,
- handshaker);
+ handshaker, grpc_schedule_on_exec_ctx);
grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
&handshaker->http_response);
return &handshaker->base;
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 4ac896923a..f8f48f2894 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -294,8 +294,9 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
}
@@ -331,7 +332,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args = grpc_channel_args_copy(args->args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
- grpc_closure_init(&c->connected, subchannel_connected, c);
+ grpc_closure_init(&c->connected, subchannel_connected, c,
+ grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
int initial_backoff_ms =
@@ -506,7 +508,8 @@ void grpc_subchannel_notify_on_state_change(
w->subchannel = c;
w->pollset_set = interested_parties;
w->notify = notify;
- grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w,
+ grpc_schedule_on_exec_ctx);
if (interested_parties != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
interested_parties);
@@ -627,7 +630,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
sw_subchannel->subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
- sw_subchannel);
+ sw_subchannel, grpc_schedule_on_exec_ctx);
if (c->disconnected) {
gpr_free(sw_subchannel);
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index ea6c62480f..390c6ff817 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -181,8 +181,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
wrapped_rr_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -249,7 +248,8 @@ static void add_pending_pick(pending_pick **root,
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg);
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg,
+ grpc_schedule_on_exec_ctx);
*root = pp;
}
@@ -269,7 +269,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg);
+ wrapped_rr_closure, &pping->wrapped_notify_arg,
+ grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -673,7 +674,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
- rr_connectivity);
+ rr_connectivity, grpc_schedule_on_exec_ctx);
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state;
@@ -914,15 +915,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pping = next;
}
}
@@ -938,9 +939,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -963,9 +964,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -1000,11 +1001,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_complete,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"),
- NULL);
+ "won't work without it. Failing"));
return 0;
}
@@ -1023,7 +1023,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->wrapped_closure = on_complete;
@@ -1122,9 +1123,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_grpclb_request_destroy(request);
grpc_closure_init(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received, glb_policy);
+ lb_on_server_status_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&glb_policy->lb_on_response_received,
- lb_on_response_received, glb_policy);
+ lb_on_response_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index b9cfe6b5c0..821becff69 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -120,7 +120,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
@@ -138,9 +138,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -165,9 +165,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -306,14 +306,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_sched(exec_ctx,
- grpc_closure_create(destroy_subchannels, p),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -366,8 +367,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -419,8 +419,7 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
}
}
@@ -485,7 +484,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
p->num_subchannels = subchannel_idx;
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
+ grpc_schedule_on_exec_ctx);
gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 896d13df3e..cb679489c3 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -321,8 +321,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"));
gpr_free(pp);
}
grpc_connectivity_state_set(
@@ -348,9 +348,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -376,9 +376,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -581,7 +581,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
(void *)selected->subchannel, (void *)selected);
}
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
update_lb_connectivity_status(exec_ctx, sd, error);
@@ -634,7 +634,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
@@ -684,8 +684,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
+ grpc_closure_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"));
}
}
@@ -749,7 +749,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
}
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed, sd);
+ rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx);
}
}
if (subchannel_idx == 0) {
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index aeb537eabd..8af6191c3b 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -105,7 +105,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
memset(calld, 0, sizeof(call_data));
calld->id = (intptr_t)args->call_stack;
- grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
+ grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem,
+ grpc_schedule_on_exec_ctx);
/* TODO(dgq): do something with the data
channel_data *chand = elem->channel_data;
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 37a2d8dc30..124b16bbc3 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -112,8 +112,8 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
if (r->next_completion != NULL) {
*r->target_result = NULL;
- grpc_exec_ctx_sched(exec_ctx, r->next_completion,
- GRPC_ERROR_CREATE("Resolver Shutdown"), NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE("Resolver Shutdown"));
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -219,9 +219,10 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!r->resolving);
r->resolving = true;
r->addresses = NULL;
- grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
- r->interested_parties,
- grpc_closure_create(dns_on_resolved, r), &r->addresses);
+ grpc_resolve_address(
+ exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
+ grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx),
+ &r->addresses);
}
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -231,7 +232,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = r->resolved_result == NULL
? NULL
: grpc_channel_args_copy(r->resolved_result);
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 5fd1607cee..a1365f6465 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -90,7 +90,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_result = NULL;
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -124,7 +124,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses);
*r->target_result =
grpc_channel_args_copy_and_add(r->channel_args, &arg, 1);
- grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index d8afcfb626..bf4d797938 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -142,7 +142,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
c->handshake_mgr = NULL;
gpr_mu_unlock(&c->mu);
@@ -181,7 +181,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
memset(c->result, 0, sizeof(*c->result));
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {
@@ -204,7 +204,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
memset(c->result, 0, sizeof(*c->result));
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_closure_sched(exec_ctx, notify, error);
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
@@ -212,7 +212,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GPR_ASSERT(c->endpoint != NULL);
if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
+ c, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&c->initial_string_buffer);
grpc_slice_buffer_add(&c->initial_string_buffer,
c->args.initial_connect_string);
@@ -238,7 +238,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
c->result = result;
GPR_ASSERT(c->endpoint == NULL);
chttp2_connector_ref(con); // Ref taken for callback.
- grpc_closure_init(&c->connected, connected, c);
+ grpc_closure_init(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index d573bbe73e..86f5a198c2 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -282,7 +282,8 @@ grpc_error *grpc_chttp2_server_add_port(
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->tcp_server_shutdown_complete,
- tcp_server_shutdown_complete, state);
+ tcp_server_shutdown_complete, state,
+ grpc_schedule_on_exec_ctx);
err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
args, &tcp_server);
if (err != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 6563b42a60..0f762dbceb 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -74,20 +74,14 @@ static const grpc_transport_vtable vtable;
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error);
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value);
@@ -113,12 +107,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -167,8 +157,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->pings.next != &t->pings) {
grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"), NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv,
+ GRPC_ERROR_CREATE("Transport closed"));
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -247,18 +237,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked,
- t);
- grpc_closure_init(&t->write_action, write_action, t);
- grpc_closure_init(&t->write_action_end, write_action_end, t);
- grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
- grpc_closure_init(&t->read_action_begin, read_action_begin, t);
- grpc_closure_init(&t->read_action_locked, read_action_locked, t);
- grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
- grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
- grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->write_action, write_action, t,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&t->read_action_locked, read_action_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->destructive_reclaimer_locked,
- destructive_reclaimer_locked, t);
+ destructive_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -396,9 +383,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_combiner_execute(exec_ctx, t->combiner,
- grpc_closure_create(destroy_transport_locked, t),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(
+ destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
@@ -472,8 +460,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- grpc_closure_init(&s->complete_fetch, complete_fetch, s);
- grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s);
+ grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
+ grpc_schedule_on_exec_ctx);
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -550,9 +538,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
s->destroy_stream_arg = and_free_memory;
- grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
- grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -603,7 +592,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
+ grpc_closure_list_sched(exec_ctx, &t->run_after_write);
if (t->close_transport_on_writes_finished != NULL) {
grpc_error *err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = NULL;
@@ -621,9 +610,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(
@@ -665,7 +657,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"begin writing");
- grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
} else {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"begin writing nothing");
@@ -677,19 +669,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action", 0);
- grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end);
+ grpc_endpoint_write(
+ exec_ctx, t->ep, &t->outbuf,
+ grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)));
GPR_TIMER_END("write_action", 0);
}
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt,
- grpc_error *error) {
- grpc_chttp2_transport *t = gt;
- GPR_TIMER_BEGIN("write_action_end", 0);
- grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("write_action_end", 0);
-}
-
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
@@ -719,18 +705,24 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, false);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, true);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, true)),
+ GRPC_ERROR_NONE);
break;
}
@@ -968,15 +960,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
}
}
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error) {
- grpc_chttp2_stream *s = gs;
- grpc_chttp2_transport *t = s->t;
- grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked,
- GRPC_ERROR_REF(error),
- s->complete_fetch_covered_by_poller);
-}
-
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
@@ -1015,7 +998,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
- on_complete = grpc_closure_create(do_nothing, NULL);
+ on_complete =
+ grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1218,13 +1202,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(str);
}
- grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
- op);
op->transport_private.args[0] = gt;
op->transport_private.args[1] = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, op->covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &op->transport_private.closure, perform_stream_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1253,7 +1239,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_outstanding_ping *ping;
for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -1327,11 +1313,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
op->transport_private.args[0] = gt;
- grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
- op);
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&op->transport_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1806,19 +1793,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- /* Control flow:
- reading_action_locked ->
- (parse_unlocked -> post_parse_locked)? ->
- post_reading_action_locked */
- GPR_TIMER_BEGIN("reading_action", 0);
- grpc_chttp2_transport *t = tp;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("reading_action", 0);
-}
-
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@@ -1918,7 +1892,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer);
if (keep_reading) {
- grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin);
+ grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
+ &t->read_action_locked);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -2055,10 +2030,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs->next_action.slice = slice;
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
- grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner,
- &bs->next_action.closure, GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@@ -2080,10 +2057,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
@@ -2091,7 +2070,7 @@ static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
bs->error = error;
@@ -2108,7 +2087,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (bs->on_next != NULL) {
*bs->next = slice;
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
} else {
grpc_slice_buffer_add(&bs->slices, slice);
@@ -2176,7 +2155,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- false, &t->benign_reclaimer);
+ false, &t->benign_reclaimer_locked);
}
}
@@ -2187,24 +2166,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- true, &t->destructive_reclaimer);
+ true, &t->destructive_reclaimer_locked);
}
}
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
@@ -2385,5 +2350,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
- read_action_begin(exec_ctx, t, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 5dabe272d3..2a14167f67 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1692,10 +1692,11 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
- grpc_combiner_execute_finally(
- exec_ctx, t->combiner,
- grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(force_client_rst_stream, s,
+ grpc_combiner_finally_scheduler(
+ t->combiner, false)),
+ GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b727965d43..a52acbacdb 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -212,10 +212,8 @@ struct grpc_chttp2_transport {
grpc_closure write_action_begin_locked;
grpc_closure write_action;
- grpc_closure write_action_end;
grpc_closure write_action_end_locked;
- grpc_closure read_action_begin;
grpc_closure read_action_locked;
/** incoming read bytes */
@@ -336,10 +334,8 @@ struct grpc_chttp2_transport {
/** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered;
/** benign cleanup closure */
- grpc_closure benign_reclaimer;
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
- grpc_closure destructive_reclaimer;
grpc_closure destructive_reclaimer_locked;
};
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index c45c653247..e37c541808 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -868,18 +868,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED);
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->recv_initial_metadata_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.initial_metadata,
stream_op->recv_initial_metadata);
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -930,22 +930,22 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_CANCELLED);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->recv_message_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -978,8 +978,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -1013,8 +1013,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
/* Do an extra read to trigger on_succeeded() callback in case connection
@@ -1075,18 +1075,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete,
- GRPC_ERROR_REF(stream_state->cancel_error), NULL);
+ grpc_closure_sched(exec_ctx, stream_op->on_complete,
+ GRPC_ERROR_REF(stream_state->cancel_error));
} else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, stream_op->on_complete,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
*/
- grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE);
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index b62aebeb23..fe2e50e71a 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -297,7 +297,8 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
@@ -307,7 +308,8 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status,
optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
@@ -319,7 +321,8 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 9cb8d8e49c..62075ed613 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -279,8 +279,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index b7579b0b14..8dd6d099e1 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -124,7 +124,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op* op) {
deadline_state->next_on_complete = op->on_complete;
- grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &deadline_state->on_complete;
}
@@ -173,8 +174,9 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
- grpc_closure_init(&state->closure, start_timer_after_init, state);
- grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_init(&state->closure, start_timer_after_init, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE);
}
}
@@ -291,7 +293,8 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
calld->recv_initial_metadata = op->recv_initial_metadata;
grpc_closure_init(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, elem);
+ recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 23edc826ca..ff827527b3 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -165,7 +165,7 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
- grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error);
mgr->shutdown = true;
} else {
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
@@ -218,8 +218,10 @@ void grpc_handshake_manager_do_handshake(
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
mgr->acceptor = acceptor;
- grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
- grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
+ grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 00459ad7d7..c2ee0a0047 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -393,12 +393,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem);
+ hc_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem);
- grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ hc_on_recv_trailing_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index dbd901f4fa..776eff456d 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -344,9 +344,12 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
- grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
- grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&calld->read_slice_buffer);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 87cc7c8a18..f8cdf62540 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -128,7 +128,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_free(message_string);
}
// Invoke the next callback.
- grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
+ grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error);
}
// Start transport stream op.
@@ -164,7 +164,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
- grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 625b83697c..fb2108987b 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -104,7 +104,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != NULL) {
grpc_resolved_addresses_destroy(req->addresses);
@@ -225,7 +225,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
return;
}
addr = &req->addresses->addrs[req->next_address++];
- grpc_closure_init(&req->connected, on_connected, req);
+ grpc_closure_init(&req->connected, on_connected, req,
+ grpc_schedule_on_exec_ctx);
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
@@ -267,8 +268,9 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
req->pollent = pollent;
req->overall_error = GRPC_ERROR_NONE;
req->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_init(&req->on_read, on_read, req);
- grpc_closure_init(&req->done_write, done_write, req);
+ grpc_closure_init(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&req->done_write, done_write, req,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&req->incoming);
grpc_slice_buffer_init(&req->outgoing);
grpc_iomgr_register_object(&req->iomgr_obj, name);
@@ -278,9 +280,11 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pollent);
grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
- req->context->pollset_set,
- grpc_closure_create(on_resolved, req), &req->addresses);
+ grpc_resolve_address(
+ exec_ctx, request->host, req->handshaker->default_port,
+ req->context->pollset_set,
+ grpc_closure_create(on_resolved, req, grpc_schedule_on_exec_ctx),
+ &req->addresses);
}
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 7dd50b7dab..440817c5a6 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -98,7 +98,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
}
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index c6ddc76732..da0ec878a3 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -37,10 +37,13 @@
#include "src/core/lib/profiling/timers.h"
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg) {
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
closure->cb = cb;
closure->cb_arg = cb_arg;
+ closure->scheduler = scheduler;
+ return closure;
}
void grpc_closure_list_init(grpc_closure_list *closure_list) {
@@ -105,11 +108,12 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
cb(exec_ctx, cb_arg, error);
}
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
wrapped_closure *wc = gpr_malloc(sizeof(*wc));
wc->cb = cb;
wc->cb_arg = cb_arg;
- grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
+ grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
return &wc->wrapper;
}
@@ -117,8 +121,30 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
GPR_TIMER_BEGIN("grpc_closure_run", 0);
if (c != NULL) {
- c->cb(exec_ctx, c->cb_arg, error);
+ c->scheduler->vtable->run(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
}
- GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_closure_run", 0);
}
+
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_closure_sched", 0);
+ if (c != NULL) {
+ c->scheduler->vtable->sched(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ GPR_TIMER_END("grpc_closure_sched", 0);
+}
+
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ grpc_closure *c = list->head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ list->head = list->tail = NULL;
+}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index ec0114a2a4..ee386fbc76 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -57,6 +57,22 @@ typedef struct grpc_closure_list {
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
+typedef struct grpc_closure_scheduler grpc_closure_scheduler;
+
+typedef struct grpc_closure_scheduler_vtable {
+ /* NOTE: for all these functions, closure->scheduler == the scheduler that was
+ used to find this vtable */
+ void (*run)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+ void (*sched)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+} grpc_closure_scheduler_vtable;
+
+/** Abstract type that can schedule closures for execution */
+struct grpc_closure_scheduler {
+ const grpc_closure_scheduler_vtable *vtable;
+};
+
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@@ -73,6 +89,10 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
+ /** Scheduler to schedule against: NULL to schedule against current execution
+ context */
+ grpc_closure_scheduler *scheduler;
+
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error *error;
@@ -80,12 +100,14 @@ struct grpc_closure {
} error_data;
};
-/** Initializes \a closure with \a cb and \a cb_arg. */
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg);
+/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler);
/* Create a heap allocated closure: try to avoid except for very rare events */
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
@@ -113,4 +135,13 @@ bool grpc_closure_list_empty(grpc_closure_list list);
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+/** Schedule a closure to be run. Does not need to be run from a safe point. */
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+
+/** Schedule all closures in a list to be run. Does not need to be run from a
+ * safe point. */
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *closure_list);
+
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index cfc67020ae..c26a73b2b7 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -56,6 +56,10 @@ int grpc_combiner_trace = 0;
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
+ grpc_closure_scheduler uncovered_scheduler;
+ grpc_closure_scheduler covered_scheduler;
+ grpc_closure_scheduler uncovered_finally_scheduler;
+ grpc_closure_scheduler covered_finally_scheduler;
gpr_mpscq queue;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
@@ -70,6 +74,26 @@ struct grpc_combiner {
grpc_closure offload;
};
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+
+static const grpc_closure_scheduler_vtable scheduler_uncovered = {
+ combiner_exec_uncovered, combiner_exec_uncovered};
+static const grpc_closure_scheduler_vtable scheduler_covered = {
+ combiner_exec_covered, combiner_exec_covered};
+static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
+ combiner_finally_exec_uncovered, combiner_finally_exec_uncovered};
+static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
+ combiner_finally_exec_covered, combiner_finally_exec_covered};
+
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
typedef struct {
@@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
lock->final_list_covered_by_poller = false;
+ lock->uncovered_scheduler.vtable = &scheduler_uncovered;
+ lock->covered_scheduler.vtable = &scheduler_covered;
+ lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
+ lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock);
+ grpc_closure_init(&lock->offload, offload, lock,
+ grpc_workqueue_scheduler(lock->optional_workqueue));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(
@@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
GPR_TIMER_END("combiner.execute", 0);
}
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
+ error, false);
+}
+
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
+ error, true);
+}
+
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
lock->optional_workqueue));
- grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
- grpc_error *error) {
- grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
+ grpc_error *error);
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock, grpc_closure *closure,
+ grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock, false)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
+
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
+ cl, uncovered_finally_scheduler),
+ cl, error, false);
+}
+
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl, grpc_error *error) {
+ combiner_execute_finally(
+ exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
+ cl, error, true);
+}
+
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
+ bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_scheduler
+ : &combiner->uncovered_scheduler;
+}
+
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(
+ grpc_combiner *combiner, bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_finally_scheduler
+ : &combiner->uncovered_finally_scheduler;
+}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index d04eeed83a..81dff85d40 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -50,14 +50,12 @@
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-// Execute \a action within the lock.
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
-// Execute \a action within the lock just prior to unlocking.
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
+// Fetch a scheduler to schedule closures against
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
+// Scheduler to execute \a action within the lock just prior to unlocking.
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 59b1fe896f..d6664aead2 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -201,6 +201,8 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
+ grpc_closure_scheduler workqueue_scheduler;
+
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -304,6 +306,8 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -316,6 +320,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
+static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
+ workqueue_enqueue, workqueue_enqueue};
+
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -528,6 +535,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
+ pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
@@ -799,10 +807,10 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
@@ -819,6 +827,12 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("workqueue.enqueue", 0);
}
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ polling_island *pi = (polling_island *)workqueue;
+ return workqueue == NULL ? grpc_schedule_on_exec_ctx
+ : &pi->workqueue_scheduler;
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1029,8 +1043,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1056,16 +1069,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -1087,7 +1098,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -1358,7 +1369,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
@@ -1409,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
@@ -1958,7 +1971,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 21b28e5554..5bc5621443 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -397,7 +397,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
if (!fd->released) {
close(fd->fd);
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -457,16 +457,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -489,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -852,7 +850,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
@@ -901,7 +899,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
goto done;
}
/* If we're shutting down then we don't execute any extended work */
@@ -1081,7 +1079,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1100,7 +1098,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
}
if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
@@ -1288,10 +1286,8 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {}
#endif
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
- grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
/*******************************************************************************
@@ -1534,7 +1530,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index e148416760..c106ba5400 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -277,9 +277,8 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return g_event_engine->workqueue_scheduler(workqueue);
}
#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index cb5832539d..1068a4bad5 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -106,8 +106,7 @@ typedef struct grpc_event_engine_vtable {
grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
- void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+ grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 604713e578..6aa788f8e5 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -57,7 +57,6 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
return true;
}
-#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
@@ -67,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
did_something = true;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
@@ -76,30 +77,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
}
GPR_ASSERT(exec_ctx->active_combiner == NULL);
- if (exec_ctx->stealing_from_workqueue != NULL) {
- if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
- grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- } else {
- grpc_closure *c = exec_ctx->stolen_closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- grpc_error *error = c->error_data.error;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
- grpc_exec_ctx_flush(exec_ctx);
- did_something = true;
- }
- }
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
return did_something;
}
@@ -109,104 +86,21 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
grpc_exec_ctx_flush(exec_ctx);
}
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null) {
- GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
- if (offload_target_or_null == NULL) {
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else if (exec_ctx->stealing_from_workqueue == NULL) {
- exec_ctx->stealing_from_workqueue = offload_target_or_null;
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- } else { /* stealing_from_workqueue == offload_target_or_null */
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- }
- GPR_TIMER_END("grpc_exec_ctx_sched", 0);
+static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ closure->cb(exec_ctx, closure->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
}
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- grpc_closure_list_move(list, &exec_ctx->closure_list);
+static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_global_init(void) {}
void grpc_exec_ctx_global_shutdown(void) {}
-#else
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static int g_threads = 0;
-
-static void run_closure(void *arg) {
- grpc_closure *closure = arg;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0);
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_mu_lock(&g_mu);
- if (--g_threads == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-static void start_closure(grpc_closure *closure) {
- gpr_thd_id id;
- gpr_mu_lock(&g_mu);
- g_threads++;
- gpr_mu_unlock(&g_mu);
- gpr_thd_new(&id, run_closure, closure, NULL);
-}
-
-bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; }
-
-void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {}
-void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- bool success,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (closure == NULL) return;
- closure->final_data = success;
- start_closure(closure);
-}
-
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (list == NULL) return;
- grpc_closure *p = list->head;
- while (p) {
- grpc_closure *start = p;
- p = grpc_closure_next(start);
- start_closure(start);
- }
- grpc_closure_list r = GRPC_CLOSURE_LIST_INIT;
- *list = r;
-}
-
-void grpc_exec_ctx_global_init(void) {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
-}
-
-void grpc_exec_ctx_global_shutdown(void) {
- gpr_mu_lock(&g_mu);
- while (g_threads != 0) {
- gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
- }
- gpr_mu_unlock(&g_mu);
-
- gpr_mu_destroy(&g_mu);
- gpr_cv_destroy(&g_cv);
-}
-#endif
+static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
+ exec_ctx_run, exec_ctx_sched};
+static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
+grpc_closure_scheduler *grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 7e50cb9825..e566f1b3e8 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,17 +66,6 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
- /** The workqueue we're stealing work from.
- As items are queued to the execution context, we try to steal one
- workqueue item and execute it inline (assuming the exec_ctx is not
- finished) - doing so does not invalidate the workqueue's contract, and
- provides a small latency win in cases where we get a hit */
- grpc_workqueue *stealing_from_workqueue;
- /** The workqueue item that was stolen from the workqueue above. When new
- items are scheduled to be offloaded to that workqueue, we need to update
- this like a 1-deep fifo to maintain the invariant that workqueue items
- queued by one thread are started in order */
- grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
/** last active combiner in the active combiner list */
@@ -89,10 +78,7 @@ struct grpc_exec_ctx {
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { \
- GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
- finish_check \
- }
+ { GRPC_CLOSURE_LIST_INIT, NULL, NULL, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
@@ -108,6 +94,8 @@ struct grpc_exec_ctx {
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
+extern grpc_closure_scheduler *grpc_schedule_on_exec_ctx;
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -115,14 +103,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed in the future.
- If \a offload_target_or_null is NULL, the closure will be executed at the
- next exec_ctx.{finish,flush} point.
- If \a offload_target_or_null is non-NULL, the closure will be scheduled
- against the workqueue, and a reference to the workqueue will be consumed. */
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null);
/** Returns true if we'd like to leave this execution context as soon as
possible: useful for deciding whether to do something more or not depending
on outside context */
@@ -131,11 +111,6 @@ bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** A finish check that is always ready to finish */
bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
-/** Add a list of closures to be executed at the next flush/finish point.
- * Leaves \a list empty. */
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_global_init(void);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 4e78b619fb..852775564f 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ gpr_mu_unlock(&g_executor.mu);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(&exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -112,7 +120,8 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_append(&g_executor.closures, closure, error);
@@ -132,10 +141,24 @@ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_exec_ctx_enqueue_list(exec_ctx, &g_executor.closures, NULL);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
gpr_thd_join(g_executor.tid);
}
gpr_mu_destroy(&g_executor.mu);
}
+
+static const grpc_closure_scheduler_vtable executor_vtable = {executor_push,
+ executor_push};
+static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
+grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 7bf8f21940..1213016383 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -43,9 +43,7 @@
* non-blocking solution available. */
void grpc_executor_init();
-/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
- * thread */
-void grpc_executor_push(grpc_closure *closure, grpc_error *error);
+extern grpc_closure_scheduler *grpc_executor_scheduler;
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 3a74b842b6..ed3edeee94 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -83,7 +83,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
}
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 5540303e49..2a45e708df 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@@ -167,8 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 821932e562..50e470d149 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -163,10 +163,9 @@ typedef struct {
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
grpc_error *error) {
request *r = rp;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, r->on_done,
- grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out),
- NULL);
+ grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -185,12 +184,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 3269c4f09f..9b5f3209f0 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -98,7 +98,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
error = handle_addrinfo_result(status, res, r->addresses);
- grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
@@ -193,7 +193,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
int s;
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
return;
}
r = gpr_malloc(sizeof(request));
@@ -217,7 +217,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
*addrs = NULL;
err = GRPC_ERROR_CREATE("getaddrinfo failed");
err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s));
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
gpr_free(r);
gpr_free(req);
gpr_free(hints);
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index fada5ecbe8..2439ce3cb7 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
} else {
GRPC_ERROR_REF(error);
}
- grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -173,12 +173,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 3dbc810102..21aa05e57a 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -265,9 +265,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_ref_internal(resource_quota);
- grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
- &resource_quota->rq_step_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure,
+ GRPC_ERROR_NONE);
}
/* returns true if all allocations are completed */
@@ -294,7 +293,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
- grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
+ grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
@@ -432,7 +431,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
resource_user->new_reclaimers[destructive] = NULL;
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@@ -473,10 +472,10 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
@@ -489,10 +488,10 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(exec_ctx, resource_user->resource_quota);
@@ -564,9 +563,12 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota);
+ grpc_closure_init(
+ &resource_quota->rq_step_closure, rq_step, resource_quota,
+ grpc_combiner_finally_scheduler(resource_quota->combiner, true));
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
- rq_reclamation_done, resource_quota);
+ rq_reclamation_done, resource_quota,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
@@ -607,9 +609,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
rq_resize_args *a = gpr_malloc(sizeof(*a));
a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
- grpc_closure_init(&a->closure, rq_resize, a);
- grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -658,15 +659,19 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->resource_quota =
grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
- resource_user);
+ resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->add_to_free_pool_closure,
- &ru_add_to_free_pool, resource_user);
+ &ru_add_to_free_pool, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
- &ru_post_benign_reclaimer, resource_user);
+ &ru_post_benign_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
- &ru_post_destructive_reclaimer, resource_user);
- grpc_closure_init(&resource_user->destroy_closure, &ru_destroy,
- resource_user);
+ &ru_post_destructive_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@@ -701,9 +706,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx,
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->destroy_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->destroy_closure,
+ GRPC_ERROR_NONE);
}
}
@@ -719,9 +723,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- grpc_closure_create(ru_shutdown, resource_user),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(
+ ru_shutdown, resource_user,
+ grpc_combiner_scheduler(
+ resource_user->resource_quota->combiner, false)),
+ GRPC_ERROR_NONE);
}
}
@@ -741,12 +748,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
if (!resource_user->allocating) {
resource_user->allocating = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->allocate_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->allocate_closure,
+ GRPC_ERROR_NONE);
}
} else {
- grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
}
@@ -765,9 +771,8 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->add_to_free_pool_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure,
+ GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(exec_ctx, resource_user, (gpr_atm)size);
@@ -779,9 +784,9 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
resource_user->new_reclaimers[destructive] = closure;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->post_reclaimer_closure[destructive],
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ &resource_user->post_reclaimer_closure[destructive],
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
@@ -790,18 +795,20 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
- grpc_combiner_execute(
- exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->resource_quota->rq_reclamation_done_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure,
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
- slice_allocator);
- grpc_closure_init(&slice_allocator->on_done, cb, p);
+ grpc_closure_init(
+ &slice_allocator->on_allocated, ru_allocated_slices, slice_allocator,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
+ grpc_closure_init(
+ &slice_allocator->on_done, cb, p,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
slice_allocator->resource_user = resource_user;
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 54911e0e31..2f2e02f715 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -131,7 +131,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@@ -154,7 +154,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
- grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 2db56b47ee..c8237dc38f 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -265,7 +265,7 @@ finish:
grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -294,7 +294,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
@@ -303,7 +303,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
@@ -321,14 +321,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep =
grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
@@ -343,8 +342,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
- ac->write_closure.cb = on_writable;
- ac->write_closure.cb_arg = ac;
+ grpc_closure_init(&ac->write_closure, on_writable, ac,
+ grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 5f2ccfee76..ed0de50fc1 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -110,7 +110,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
if (done) {
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
- grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
+ grpc_closure_sched(&exec_ctx, closure, error);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index cc10060b9b..275258ebb5 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -129,7 +129,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -227,7 +227,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
- grpc_closure_init(&ac->on_connect, on_connect, ac);
+ grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
@@ -247,7 +247,7 @@ failure:
closesocket(sock);
}
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
- grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
+ grpc_closure_sched(exec_ctx, on_done, final_error);
}
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2b46544f82..ece44978b0 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -320,7 +320,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
}
}
@@ -464,11 +464,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb,
- grpc_fd_is_shutdown(tcp->em_fd)
- ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
- : GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE);
return;
}
tcp->outgoing_buffer = buf;
@@ -488,7 +487,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_log(GPR_DEBUG, "write: %s", str);
grpc_error_free_string(str);
}
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
GPR_TIMER_END("tcp_write", 0);
@@ -556,10 +555,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = tcp_handle_read;
- tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = tcp_handle_write;
- tcp->write_closure.cb_arg = tcp;
+ grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 1e63142e0e..20efb678b2 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -254,8 +254,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
}
@@ -723,8 +723,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -733,8 +733,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -760,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 4ccc80c9f0..eed2773f8a 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -126,7 +126,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
while (s->head) {
@@ -170,7 +170,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 053bb35f6f..97d7827461 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -162,11 +162,12 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
@@ -204,7 +205,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
@@ -465,7 +466,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->new_socket = INVALID_SOCKET;
sp->port = port;
sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp);
+ grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
*listener = sp;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 15c8b3cbee..a1ee597d09 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -170,7 +170,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// nread < 0: Error
error = GRPC_ERROR_CREATE("TCP Read failed");
}
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -191,7 +191,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
error = GRPC_ERROR_CREATE("TCP Read failed at start");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
if (grpc_tcp_trace) {
const char *str = grpc_error_string(error);
@@ -218,7 +218,7 @@ static void write_callback(uv_write_t *req, int status) {
gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -244,8 +244,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -255,7 +255,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 8b3b44aaaa..84f791ba07 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -189,7 +189,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
tcp->read_cb = NULL;
TCP_UNREF(exec_ctx, tcp, "read");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -203,8 +203,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -228,7 +228,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -241,8 +241,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read,
- GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read,
+ GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
@@ -273,7 +273,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
/* Initiates a write. */
@@ -291,8 +291,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -323,7 +323,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -341,8 +341,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
- NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
@@ -425,8 +424,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
- grpc_closure_init(&tcp->on_read, on_read, tcp);
- grpc_closure_init(&tcp->on_write, on_write, tcp);
+ grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 00058f9d86..ecd3b284dc 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -184,22 +184,22 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
shard_type *shard = &g_shards[shard_idx(timer)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
+ grpc_schedule_on_exec_ctx);
timer->deadline = deadline;
timer->triggered = 0;
if (!g_initialized) {
timer->triggered = 1;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &timer->closure,
- GRPC_ERROR_CREATE("Attempt to create timer before initialization"),
- NULL);
+ GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
return;
}
@@ -251,7 +251,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -317,7 +317,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index cfcb89268b..00b835ffb8 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -55,7 +55,7 @@ void run_expired_timer(uv_timer_t *handle) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
- grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -65,10 +65,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void *timer_cb_arg, gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
+ grpc_schedule_on_exec_ctx);
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
return;
}
timer->triggered = 0;
@@ -83,7 +84,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 3c24ea9afa..dfbd295c91 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -126,7 +126,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -170,8 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
@@ -446,8 +446,8 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 73d9849843..371b0f55dc 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -72,17 +72,16 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Add a work item to a workqueue. Items added to a work queue will be started
- in approximately the order they were enqueued, on some thread that may or
- may not be the current thread. Successive closures enqueued onto a workqueue
- MAY be executed concurrently.
+/** Fetch the workqueue closure scheduler. Items added to a work queue will be
+ started in approximately the order they were enqueued, on some thread that
+ may or may not be the current thread. Successive closures enqueued onto a
+ workqueue MAY be executed concurrently.
It is generally more expensive to add a closure to a workqueue than to the
execution context, both in terms of CPU work and in execution latency.
Use work queues when it's important that other threads be given a chance to
tackle some workload. */
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
index e58ca476cc..4d61b40912 100644
--- a/src/core/lib/iomgr/workqueue_uv.c
+++ b/src/core/lib/iomgr/workqueue_uv.c
@@ -58,9 +58,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 5c93d3c59e..234b47cdf5 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -56,9 +56,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index f41184f9f9..a8679d097d 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -116,9 +116,10 @@ static void md_only_test_get_request_metadata(
if (c->is_async) {
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- grpc_executor_push(
- grpc_closure_create(on_simulated_token_fetch_done, cb_arg),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(on_simulated_token_fetch_done,
+ cb_arg, grpc_executor_scheduler),
+ GRPC_ERROR_NONE);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL);
}
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index 190950619e..a098741b70 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -130,7 +130,8 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_httpcli_get(
exec_ctx, &context, &detector.pollent, resource_quota, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
- grpc_closure_create(on_compute_engine_detection_http_response, &detector),
+ grpc_closure_create(on_compute_engine_detection_http_response, &detector,
+ grpc_schedule_on_exec_ctx),
&detector.response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@@ -155,7 +156,8 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_httpcli_context_destroy(&context);
grpc_closure_init(&destroy_closure, destroy_pollset,
- grpc_polling_entity_pollset(&detector.pollent));
+ grpc_polling_entity_pollset(&detector.pollent),
+ grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(exec_ctx,
grpc_polling_entity_pollset(&detector.pollent),
&destroy_closure);
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index 71febc248a..2270be8f44 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -47,6 +47,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/security/util/b64.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/ssl_types.h"
/* --- Utils. --- */
@@ -309,6 +310,17 @@ grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims,
return GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE;
}
+ /* This should be probably up to the upper layer to decide but let's harcode
+ the 99% use case here for email issuers, where the JWT must be self
+ issued. */
+ if (grpc_jwt_issuer_email_domain(claims->iss) != NULL &&
+ claims->sub != NULL && strcmp(claims->iss, claims->sub) != 0) {
+ gpr_log(GPR_ERROR,
+ "Email issuer (%s) cannot assert another subject (%s) than itself.",
+ claims->iss, claims->sub);
+ return GRPC_JWT_VERIFIER_BAD_SUBJECT;
+ }
+
if (audience == NULL) {
audience_ok = claims->aud == NULL;
} else {
@@ -673,7 +685,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- grpc_closure_create(on_keys_retrieved, ctx),
+ grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_json_destroy(json);
@@ -714,10 +726,26 @@ static void verifier_put_mapping(grpc_jwt_verifier *v, const char *email_domain,
GPR_ASSERT(v->num_mappings <= v->allocated_mappings);
}
+/* Very non-sophisticated way to detect an email address. Should be good
+ enough for now... */
+const char *grpc_jwt_issuer_email_domain(const char *issuer) {
+ const char *at_sign = strchr(issuer, '@');
+ if (at_sign == NULL) return NULL;
+ const char *email_domain = at_sign + 1;
+ if (*email_domain == '\0') return NULL;
+ const char *dot = strrchr(email_domain, '.');
+ if (dot == NULL || dot == email_domain) return email_domain;
+ GPR_ASSERT(dot > email_domain);
+ /* There may be a subdomain, we just want the domain. */
+ dot = gpr_memrchr(email_domain, '.', (size_t)(dot - email_domain));
+ if (dot == NULL) return email_domain;
+ return dot + 1;
+}
+
/* Takes ownership of ctx. */
static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
verifier_cb_ctx *ctx) {
- const char *at_sign;
+ const char *email_domain;
grpc_closure *http_cb;
char *path_prefix = NULL;
const char *iss;
@@ -742,13 +770,9 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
Nobody seems to implement the account/email/webfinger part 2. of the spec
so we will rely instead on email/url mappings if we detect such an issuer.
Part 4, on the other hand is implemented by both google and salesforce. */
-
- /* Very non-sophisticated way to detect an email address. Should be good
- enough for now... */
- at_sign = strchr(iss, '@');
- if (at_sign != NULL) {
+ email_domain = grpc_jwt_issuer_email_domain(iss);
+ if (email_domain != NULL) {
email_key_mapping *mapping;
- const char *email_domain = at_sign + 1;
GPR_ASSERT(ctx->verifier != NULL);
mapping = verifier_get_mapping(ctx->verifier, email_domain);
if (mapping == NULL) {
@@ -763,7 +787,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
*(path_prefix++) = '\0';
gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss);
}
- http_cb = grpc_closure_create(on_keys_retrieved, ctx);
+ http_cb =
+ grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_KEYS;
} else {
req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss);
@@ -775,7 +800,8 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&req.http.path, "/%s%s", path_prefix,
GRPC_OPENID_CONFIG_URL_SUFFIX);
}
- http_cb = grpc_closure_create(on_openid_config_retrieved, ctx);
+ http_cb = grpc_closure_create(on_openid_config_retrieved, ctx,
+ grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_OPENID;
}
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h
index b79f411903..4fa320a415 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.h
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h
@@ -43,8 +43,7 @@
/* --- Constants. --- */
#define GRPC_OPENID_CONFIG_URL_SUFFIX "/.well-known/openid-configuration"
-#define GRPC_GOOGLE_SERVICE_ACCOUNTS_EMAIL_DOMAIN \
- "developer.gserviceaccount.com"
+#define GRPC_GOOGLE_SERVICE_ACCOUNTS_EMAIL_DOMAIN "gserviceaccount.com"
#define GRPC_GOOGLE_SERVICE_ACCOUNTS_KEY_URL_PREFIX \
"www.googleapis.com/robot/v1/metadata/x509"
@@ -57,6 +56,7 @@ typedef enum {
GRPC_JWT_VERIFIER_BAD_AUDIENCE,
GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR,
GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE,
+ GRPC_JWT_VERIFIER_BAD_SUBJECT,
GRPC_JWT_VERIFIER_GENERIC_ERROR
} grpc_jwt_verifier_status;
@@ -134,5 +134,6 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_exec_ctx *exec_ctx,
grpc_json *json, grpc_slice buffer);
grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims,
const char *audience);
+const char *grpc_jwt_issuer_email_domain(const char *issuer);
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_JWT_JWT_VERIFIER_H */
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index cbcd74958c..1b0e43a1e4 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -313,9 +313,11 @@ static void compute_engine_fetch_oauth2(
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("oauth2_credentials");
- grpc_httpcli_get(exec_ctx, httpcli_context, pollent, resource_quota, &request,
- deadline, grpc_closure_create(response_cb, metadata_req),
- &metadata_req->response);
+ grpc_httpcli_get(
+ exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline,
+ grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+
+ &metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
@@ -370,10 +372,11 @@ static void refresh_token_fetch_oauth2(
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("oauth2_credentials_refresh");
- grpc_httpcli_post(exec_ctx, httpcli_context, pollent, resource_quota,
- &request, body, strlen(body), deadline,
- grpc_closure_create(response_cb, metadata_req),
- &metadata_req->response);
+ grpc_httpcli_post(
+ exec_ctx, httpcli_context, pollent, resource_quota, &request, body,
+ strlen(body), deadline,
+ grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+ &metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
gpr_free(body);
}
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 2906621a19..18a7a6f7e7 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -147,7 +147,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
}
}
ep->read_buffer = NULL;
- grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL);
+ grpc_closure_sched(exec_ctx, ep->read_cb, error);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
@@ -330,10 +330,9 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer);
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, cb,
- grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result),
- NULL);
+ grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result));
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
return;
}
@@ -418,7 +417,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
grpc_slice_buffer_init(&ep->output_buffer);
grpc_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_closure_init(&ep->on_read, on_read, ep);
+ grpc_closure_init(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index f0e835929a..b09127811b 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -134,9 +134,9 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_auth_context **auth_context,
grpc_closure *on_peer_checked) {
if (sc == NULL) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_peer_checked,
- GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL);
+ GRPC_ERROR_CREATE("cannot check peer -- no security connector"));
tsi_peer_destruct(&peer);
} else {
sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
@@ -279,7 +279,7 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -516,7 +516,7 @@ static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx,
? c->overridden_target_name
: c->target_name,
&peer, auth_context);
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -526,7 +526,7 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
grpc_closure *on_peer_checked) {
grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context);
tsi_peer_destruct(&peer);
- grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
+ grpc_closure_sched(exec_ctx, on_peer_checked, error);
}
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 7c2f63d1cc..e886cc59a0 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -138,7 +138,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
h->shutdown = true;
}
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, h->on_handshake_done, error);
}
static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -175,7 +175,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
grpc_channel_args_destroy(exec_ctx, tmp_args);
// Invoke callback.
- grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
@@ -394,10 +394,13 @@ static grpc_handshaker *security_handshaker_create(
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
grpc_closure_init(&h->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, h);
+ on_handshake_data_sent_to_peer, h,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&h->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, h);
- grpc_closure_init(&h->on_peer_checked, on_peer_checked, h);
+ on_handshake_data_received_from_peer, h,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&h->on_peer_checked, on_peer_checked, h,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&h->left_overs);
grpc_slice_buffer_init(&h->outgoing);
return &h->base;
@@ -420,9 +423,8 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_tcp_server_acceptor *acceptor,
grpc_closure *on_handshake_done,
grpc_handshaker_args *args) {
- grpc_exec_ctx_sched(exec_ctx, on_handshake_done,
- GRPC_ERROR_CREATE("Failed to create security handshaker"),
- NULL);
+ grpc_closure_sched(exec_ctx, on_handshake_done,
+ GRPC_ERROR_CREATE("Failed to create security handshaker"));
}
static const grpc_handshaker_vtable fail_handshaker_vtable = {
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index b68650f2fc..8eb84ed3e4 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -132,7 +132,7 @@ static void on_md_processing_done(
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
} else {
grpc_slice message;
grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op));
@@ -152,13 +152,13 @@ static void on_md_processing_done(
calld->transport_op->send_message = NULL;
}
calld->transport_op->send_trailing_metadata = NULL;
- close_op->on_complete = grpc_closure_create(destroy_op, close_op);
+ close_op->on_complete =
+ grpc_closure_create(destroy_op, close_op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_close(&exec_ctx, close_op, status, &message);
grpc_call_next_op(&exec_ctx, elem, close_op);
- grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv,
- grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status),
- NULL);
+ grpc_closure_sched(&exec_ctx, calld->on_done_recv,
+ grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status));
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -178,8 +178,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
return;
}
}
- grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
@@ -218,7 +217,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
+ grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
args->context[GRPC_CONTEXT_SECURITY].destroy(
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index 0dc55c2ca3..d20b86f7cf 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -301,3 +301,15 @@ void gpr_string_split(const char *input, const char *sep, char ***strs,
}
add_string_to_split(input, input + strlen(input), strs, nstrs, &capstrs);
}
+
+void *gpr_memrchr(const void *s, int c, size_t n) {
+ if (s == NULL) return NULL;
+ char *b = (char *)s;
+ size_t i;
+ for (i = 0; i < n; i++) {
+ if (b[n - i - 1] == c) {
+ return &b[n - i - 1];
+ }
+ }
+ return NULL;
+}
diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h
index 43ab4dc1be..c7fb0dbd14 100644
--- a/src/core/lib/support/string.h
+++ b/src/core/lib/support/string.h
@@ -121,6 +121,8 @@ char *gpr_strvec_flatten(gpr_strvec *strs, size_t *total_length);
lower(a)==lower(b), >0 if lower(a)>lower(b) */
int gpr_stricmp(const char *a, const char *b);
+void *gpr_memrchr(const void *s, int c, size_t n);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 366dd98279..ed186271e7 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -730,7 +730,8 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
memset(&tc->op, 0, sizeof(tc->op));
tc->op.cancel_error = tc->error;
/* reuse closure to catch completion */
- grpc_closure_init(&tc->closure, done_termination, tc);
+ grpc_closure_init(&tc->closure, done_termination, tc,
+ grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -740,7 +741,8 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
memset(&tc->op, 0, sizeof(tc->op));
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
- grpc_closure_init(&tc->closure, done_termination, tc);
+ grpc_closure_init(&tc->closure, done_termination, tc,
+ grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -751,13 +753,13 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
tc->error);
if (tc->type == TC_CANCEL) {
- grpc_closure_init(&tc->closure, send_cancel, tc);
+ grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "cancel");
} else if (tc->type == TC_CLOSE) {
- grpc_closure_init(&tc->closure, send_close, tc);
+ grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "close");
}
- grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE);
return GRPC_CALL_OK;
}
@@ -1089,8 +1091,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx,
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
- bctl);
+ grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl,
+ grpc_schedule_on_exec_ctx);
continue_receiving_slices(exec_ctx, bctl);
}
}
@@ -1205,9 +1207,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
call->has_initial_md_been_received = true;
if (call->saved_receiving_stream_ready_bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
+ receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
+ grpc_schedule_on_exec_ctx);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL);
+ grpc_closure_sched(exec_ctx, saved_rsr_closure, error);
}
gpr_mu_unlock(&call->mu);
@@ -1510,7 +1513,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
- receiving_initial_metadata_ready, bctl);
+ receiving_initial_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
bctl->recv_initial_metadata = 1;
stream_op->recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1533,7 +1537,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_buffer = op->data.recv_message;
stream_op->recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
- bctl);
+ bctl, grpc_schedule_on_exec_ctx);
stream_op->recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
@@ -1596,7 +1600,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
stream_op->context = call->context;
- grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
+ grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 0d2f01a649..e68febdddf 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -71,7 +71,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GPR_ASSERT(reserved == NULL);
pr->tag = tag;
pr->cq = cq;
- grpc_closure_init(&pr->closure, ping_done, pr);
+ grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 184c1a1a16..4613c9021e 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -168,7 +168,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
- grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
+ grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
+ grpc_schedule_on_exec_ctx);
GPR_TIMER_END("grpc_completion_queue_create", 0);
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 0055808ccf..48de0e1d5b 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -107,16 +107,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
*op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_connectivity_state_change,
+ GRPC_ERROR_NONE);
}
if (op->send_ping != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE("lame client channel"), NULL);
+ grpc_closure_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 5c83269aad..7af951f2a0 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -282,7 +282,8 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) {
struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc,
+ grpc_schedule_on_exec_ctx);
grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
@@ -350,9 +351,9 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE,
- NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
}
}
@@ -444,8 +445,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
orphan_channel(chand);
server_ref(chand->server);
maybe_finish_shutdown(exec_ctx, chand->server);
- chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
- chand->finish_destroy_channel_closure.cb_arg = chand;
+ grpc_closure_init(&chand->finish_destroy_channel_closure,
+ finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
if (grpc_server_channel_trace && error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(error);
@@ -536,8 +537,9 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
return;
}
@@ -581,9 +583,9 @@ static void finish_start_new_rpc(
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
return;
}
@@ -598,7 +600,8 @@ static void finish_start_new_rpc(
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message = &calld->payload;
- grpc_closure_init(&calld->publish, publish_new_rpc, elem);
+ grpc_closure_init(&calld->publish, publish_new_rpc, elem,
+ grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
&calld->publish);
break;
@@ -807,9 +810,10 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_NONE);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -845,7 +849,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata = &calld->initial_metadata;
- grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem);
+ grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
&calld->got_initial_metadata);
}
@@ -881,7 +886,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&calld->mu_state);
grpc_closure_init(&calld->server_on_recv_initial_metadata,
- server_on_recv_initial_metadata, elem);
+ server_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
server_ref(chand->server);
return GRPC_ERROR_NONE;
@@ -920,7 +926,8 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_closure_init(&chand->channel_connectivity_changed,
- channel_connectivity_changed, chand);
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
@@ -1278,7 +1285,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
- grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
+ grpc_closure_init(&l->destroy_done, listener_destroy_done, server,
+ grpc_schedule_on_exec_ctx);
l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
}
@@ -1384,9 +1392,10 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_NONE);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c
index c9ff30bf8a..4201192e7a 100644
--- a/src/core/lib/surface/validate_metadata.c
+++ b/src/core/lib/surface/validate_metadata.c
@@ -54,7 +54,10 @@ int grpc_header_key_is_legal(grpc_slice slice) {
0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0x03, 0x00, 0x00, 0x00,
0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
- return GRPC_SLICE_LENGTH(slice) != 0 && conforms_to(slice, legal_header_bits);
+ if (GRPC_SLICE_LENGTH(slice) == 0 || GRPC_SLICE_START_PTR(slice)[0] == ':') {
+ return 0;
+ }
+ return conforms_to(slice, legal_header_bits);
}
int grpc_header_nonbin_value_is_legal(grpc_slice slice) {
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 4f49d7cf7d..c656d93740 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -81,7 +81,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
} else {
error = GRPC_ERROR_CREATE("Shutdown connectivity owner");
}
- grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL);
+ grpc_closure_sched(exec_ctx, w->notify, error);
gpr_free(w);
}
GRPC_ERROR_UNREF(tracker->current_error);
@@ -121,7 +121,7 @@ bool grpc_connectivity_state_notify_on_state_change(
if (current == NULL) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == notify) {
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
tracker->watchers = w->next;
gpr_free(w);
return false;
@@ -129,7 +129,7 @@ bool grpc_connectivity_state_notify_on_state_change(
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
w->next = w->next->next;
gpr_free(rm_candidate);
return false;
@@ -140,8 +140,8 @@ bool grpc_connectivity_state_notify_on_state_change(
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_exec_ctx_sched(exec_ctx, notify,
- GRPC_ERROR_REF(tracker->current_error), NULL);
+ grpc_closure_sched(exec_ctx, notify,
+ GRPC_ERROR_REF(tracker->current_error));
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -191,8 +191,8 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
w->notify);
}
- grpc_exec_ctx_sched(exec_ctx, w->notify,
- GRPC_ERROR_REF(tracker->current_error), NULL);
+ grpc_closure_sched(exec_ctx, w->notify,
+ GRPC_ERROR_REF(tracker->current_error));
gpr_free(w);
}
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 9ccd7daf45..8f9e6ca4ed 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -69,7 +69,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -83,7 +83,7 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg) {
#endif
gpr_ref_init(&refcount->refs, initial_refs);
- grpc_closure_init(&refcount->destroy, cb, cb_arg);
+ grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
}
static void move64(uint64_t *from, uint64_t *to) {
@@ -169,11 +169,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error),
- NULL);
- grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_REF(error), NULL);
- grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL);
+ grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
+ grpc_closure_sched(exec_ctx, op->on_complete, error);
}
typedef struct {
@@ -197,7 +196,8 @@ static void add_error(grpc_transport_stream_op *op, grpc_error **which,
cmd = gpr_malloc(sizeof(*cmd));
cmd->error = error;
cmd->then_call = op->on_complete;
- grpc_closure_init(&cmd->closure, free_message, cmd);
+ grpc_closure_init(&cmd->closure, free_message, cmd,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &cmd->closure;
*which = error;
}
@@ -271,14 +271,14 @@ typedef struct {
static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
made_transport_op *op = gpr_malloc(sizeof(*op));
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op,
+ grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_consumed = &op->outer_on_complete;
@@ -294,8 +294,7 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
@@ -303,7 +302,7 @@ grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_malloc(sizeof(*op));
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
- op);
+ op, grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_complete = &op->outer_on_complete;
diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php
index c4d56790f7..9f0b02b8bb 100644
--- a/src/php/lib/Grpc/AbstractCall.php
+++ b/src/php/lib/Grpc/AbstractCall.php
@@ -62,7 +62,7 @@ abstract class AbstractCall
Channel $channel,
$method,
$deserialize,
- $options = []
+ array $options = []
) {
if (array_key_exists('timeout', $options) &&
is_numeric($timeout = $options['timeout'])
@@ -89,7 +89,7 @@ abstract class AbstractCall
}
/**
- * @return mixed The metadata sent by the server.
+ * @return mixed The metadata sent by the server
*/
public function getMetadata()
{
@@ -97,7 +97,7 @@ abstract class AbstractCall
}
/**
- * @return mixed The trailing metadata sent by the server.
+ * @return mixed The trailing metadata sent by the server
*/
public function getTrailingMetadata()
{
@@ -105,7 +105,7 @@ abstract class AbstractCall
}
/**
- * @return string The URI of the endpoint.
+ * @return string The URI of the endpoint
*/
public function getPeer()
{
@@ -167,8 +167,7 @@ abstract class AbstractCall
/**
* Set the CallCredentials for the underlying Call.
*
- * @param CallCredentials $call_credentials The CallCredentials
- * object
+ * @param CallCredentials $call_credentials The CallCredentials object
*/
public function setCallCredentials($call_credentials)
{
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index d0baeae955..aec60af094 100644
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -48,14 +48,14 @@ class BaseStub
private $update_metadata;
/**
- * @param $hostname string
- * @param $opts array
+ * @param string $hostname
+ * @param array $opts
* - 'update_metadata': (optional) a callback function which takes in a
* metadata array, and returns an updated metadata array
* - 'grpc.primary_user_agent': (optional) a user-agent string
- * @param $channel Channel An already created Channel object
+ * @param Channel $channel An already created Channel object (optional)
*/
- public function __construct($hostname, $opts, $channel = null)
+ public function __construct($hostname, $opts, Channel $channel = null)
{
$ssl_roots = file_get_contents(
dirname(__FILE__).'/../../../../etc/roots.pem');
@@ -98,7 +98,7 @@ class BaseStub
}
/**
- * @return string The URI of the endpoint.
+ * @return string The URI of the endpoint
*/
public function getTarget()
{
@@ -106,7 +106,7 @@ class BaseStub
}
/**
- * @param $try_to_connect bool
+ * @param bool $try_to_connect (optional)
*
* @return int The grpc connectivity state
*/
@@ -145,6 +145,12 @@ class BaseStub
return $this->_checkConnectivityState($new_state);
}
+ /**
+ * @param $new_state Connect state
+ *
+ * @return bool true if state is CHANNEL_READY
+ * @throw Exception if state is CHANNEL_FATAL_FAILURE
+ */
private function _checkConnectivityState($new_state)
{
if ($new_state == \Grpc\CHANNEL_READY) {
@@ -167,6 +173,10 @@ class BaseStub
/**
* constructs the auth uri for the jwt.
+ *
+ * @param string $method The method string
+ *
+ * @return string The URL string
*/
private function _get_jwt_aud_uri($method)
{
@@ -191,7 +201,7 @@ class BaseStub
*
* @param array $metadata The metadata map
*
- * @return $metadata Validated and key-normalized metadata map
+ * @return array $metadata Validated and key-normalized metadata map
* @throw InvalidArgumentException if key contains invalid characters
*/
private function _validate_and_normalize_metadata($metadata)
@@ -220,14 +230,16 @@ class BaseStub
* @param mixed $argument The argument to the method
* @param callable $deserialize A function that deserializes the response
* @param array $metadata A metadata map to send to the server
+ * (optional)
+ * @param array $options An array of options (optional)
*
* @return SimpleSurfaceActiveCall The active call object
*/
public function _simpleRequest($method,
$argument,
$deserialize,
- $metadata = [],
- $options = [])
+ array $metadata = [],
+ array $options = [])
{
$call = new UnaryCall($this->channel,
$method,
@@ -251,17 +263,17 @@ class BaseStub
* output.
*
* @param string $method The name of the method to call
- * @param array $arguments An array or Traversable of arguments to stream to the
- * server
* @param callable $deserialize A function that deserializes the response
* @param array $metadata A metadata map to send to the server
+ * (optional)
+ * @param array $options An array of options (optional)
*
* @return ClientStreamingSurfaceActiveCall The active call object
*/
public function _clientStreamRequest($method,
callable $deserialize,
- $metadata = [],
- $options = [])
+ array $metadata = [],
+ array $options = [])
{
$call = new ClientStreamingCall($this->channel,
$method,
@@ -281,21 +293,23 @@ class BaseStub
}
/**
- * Call a remote method that takes a single argument and returns a stream of
- * responses.
+ * Call a remote method that takes a single argument and returns a stream
+ * of responses.
*
* @param string $method The name of the method to call
* @param mixed $argument The argument to the method
* @param callable $deserialize A function that deserializes the responses
* @param array $metadata A metadata map to send to the server
+ * (optional)
+ * @param array $options An array of options (optional)
*
* @return ServerStreamingSurfaceActiveCall The active call object
*/
public function _serverStreamRequest($method,
$argument,
callable $deserialize,
- $metadata = [],
- $options = [])
+ array $metadata = [],
+ array $options = [])
{
$call = new ServerStreamingCall($this->channel,
$method,
@@ -320,13 +334,15 @@ class BaseStub
* @param string $method The name of the method to call
* @param callable $deserialize A function that deserializes the responses
* @param array $metadata A metadata map to send to the server
+ * (optional)
+ * @param array $options An array of options (optional)
*
* @return BidiStreamingSurfaceActiveCall The active call object
*/
public function _bidiRequest($method,
callable $deserialize,
- $metadata = [],
- $options = [])
+ array $metadata = [],
+ array $options = [])
{
$call = new BidiStreamingCall($this->channel,
$method,
diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php
index f0e1e811de..b03bbd204f 100644
--- a/src/php/lib/Grpc/BidiStreamingCall.php
+++ b/src/php/lib/Grpc/BidiStreamingCall.php
@@ -35,8 +35,8 @@
namespace Grpc;
/**
- * Represents an active call that allows for sending and recieving messages in
- * streams in any order.
+ * Represents an active call that allows for sending and recieving messages
+ * in streams in any order.
*/
class BidiStreamingCall extends AbstractCall
{
@@ -44,6 +44,7 @@ class BidiStreamingCall extends AbstractCall
* Start the call.
*
* @param array $metadata Metadata to send with the call, if applicable
+ * (optional)
*/
public function start(array $metadata = [])
{
@@ -76,10 +77,10 @@ class BidiStreamingCall extends AbstractCall
* writesDone is called.
*
* @param ByteBuffer $data The data to write
- * @param array $options an array of options, possible keys:
- * 'flags' => a number
+ * @param array $options An array of options, possible keys:
+ * 'flags' => a number (optional)
*/
- public function write($data, $options = [])
+ public function write($data, array $options = [])
{
$message_array = ['message' => $this->serializeMessage($data)];
if (array_key_exists('flags', $options)) {
@@ -103,8 +104,8 @@ class BidiStreamingCall extends AbstractCall
/**
* Wait for the server to send the status, and return it.
*
- * @return \stdClass The status object, with integer $code, string $details,
- * and array $metadata members
+ * @return \stdClass The status object, with integer $code, string
+ * $details, and array $metadata members
*/
public function getStatus()
{
diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php
index 20db809ea3..c542f08872 100644
--- a/src/php/lib/Grpc/ClientStreamingCall.php
+++ b/src/php/lib/Grpc/ClientStreamingCall.php
@@ -35,8 +35,8 @@
namespace Grpc;
/**
- * Represents an active call that sends a stream of messages and then gets a
- * single response.
+ * Represents an active call that sends a stream of messages and then gets
+ * a single response.
*/
class ClientStreamingCall extends AbstractCall
{
@@ -44,8 +44,9 @@ class ClientStreamingCall extends AbstractCall
* Start the call.
*
* @param array $metadata Metadata to send with the call, if applicable
+ * (optional)
*/
- public function start($metadata = [])
+ public function start(array $metadata = [])
{
$this->call->startBatch([
OP_SEND_INITIAL_METADATA => $metadata,
@@ -57,8 +58,8 @@ class ClientStreamingCall extends AbstractCall
* wait is called.
*
* @param ByteBuffer $data The data to write
- * @param array $options an array of options, possible keys:
- * 'flags' => a number
+ * @param array $options An array of options, possible keys:
+ * 'flags' => a number (optional)
*/
public function write($data, array $options = [])
{
diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php
index 5aeeafa94a..406512bf57 100644
--- a/src/php/lib/Grpc/ServerStreamingCall.php
+++ b/src/php/lib/Grpc/ServerStreamingCall.php
@@ -35,8 +35,8 @@
namespace Grpc;
/**
- * Represents an active call that sends a single message and then gets a stream
- * of responses.
+ * Represents an active call that sends a single message and then gets a
+ * stream of responses.
*/
class ServerStreamingCall extends AbstractCall
{
@@ -45,10 +45,11 @@ class ServerStreamingCall extends AbstractCall
*
* @param mixed $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
- * @param array $options an array of options, possible keys:
- * 'flags' => a number
+ * (optional)
+ * @param array $options An array of options, possible keys:
+ * 'flags' => a number (optional)
*/
- public function start($data, $metadata = [], $options = [])
+ public function start($data, array $metadata = [], array $options = [])
{
$message_array = ['message' => $this->serializeMessage($data)];
if (array_key_exists('flags', $options)) {
@@ -82,8 +83,8 @@ class ServerStreamingCall extends AbstractCall
/**
* Wait for the server to send the status, and return it.
*
- * @return \stdClass The status object, with integer $code, string $details,
- * and array $metadata members
+ * @return \stdClass The status object, with integer $code, string
+ * $details, and array $metadata members
*/
public function getStatus()
{
diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php
index e8eb6487a8..3c1cb158ea 100644
--- a/src/php/lib/Grpc/UnaryCall.php
+++ b/src/php/lib/Grpc/UnaryCall.php
@@ -35,8 +35,8 @@
namespace Grpc;
/**
- * Represents an active call that sends a single message and then gets a single
- * response.
+ * Represents an active call that sends a single message and then gets a
+ * single response.
*/
class UnaryCall extends AbstractCall
{
@@ -45,10 +45,11 @@ class UnaryCall extends AbstractCall
*
* @param mixed $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
- * @param array $options an array of options, possible keys:
- * 'flags' => a number
+ * (optional)
+ * @param array $options An array of options, possible keys:
+ * 'flags' => a number (optional)
*/
- public function start($data, $metadata = [], $options = [])
+ public function start($data, array $metadata = [], array $options = [])
{
$message_array = ['message' => $this->serializeMessage($data)];
if (isset($options['flags'])) {
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index ea3b6f3391..701c6af017 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -62,6 +62,7 @@ napoleon_numpy_docstring = True
napoleon_include_special_with_doc = True
html_theme = 'sphinx_rtd_theme'
+copyright = "2016, The gRPC Authors"
"""
API_GLOSSARY = """
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
index 936c895bd2..4fb22b4d9d 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
@@ -35,13 +35,13 @@ import unittest
import grpc
from src.proto.grpc.testing import test_pb2
-from tests.interop import _interop_test_case
+from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import server
-class InsecureInteropTest(
- _interop_test_case.InteropTestCase,
+class InsecureIntraopTest(
+ _intraop_test_case.IntraopTestCase,
unittest.TestCase):
def setUp(self):
diff --git a/src/python/grpcio_tests/tests/interop/_interop_test_case.py b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py
index ccea17a66d..fe1c173992 100644
--- a/src/python/grpcio_tests/tests/interop/_interop_test_case.py
+++ b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py
@@ -32,7 +32,7 @@
from tests.interop import methods
-class InteropTestCase(object):
+class IntraopTestCase(object):
"""Unit test methods.
This class must be mixed in with unittest.TestCase and a class that defines
diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
index eaca553e1b..3665c69726 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
@@ -35,15 +35,15 @@ import unittest
import grpc
from src.proto.grpc.testing import test_pb2
-from tests.interop import _interop_test_case
+from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import resources
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
-class SecureInteropTest(
- _interop_test_case.InteropTestCase,
+class SecureIntraopTest(
+ _intraop_test_case.IntraopTestCase,
unittest.TestCase):
def setUp(self):
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index c31a5f9d33..0109ee2173 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -1,7 +1,7 @@
[
"health_check._health_servicer_test.HealthServicerTest",
- "interop._insecure_interop_test.InsecureInteropTest",
- "interop._secure_interop_test.SecureInteropTest",
+ "interop._insecure_intraop_test.InsecureIntraopTest",
+ "interop._secure_intraop_test.SecureIntraopTest",
"protoc_plugin._python_plugin_test.PythonPluginTest",
"protoc_plugin._split_definitions_test.SameCommonTest",
"protoc_plugin._split_definitions_test.SameSeparateTest",