diff options
Diffstat (limited to 'src/core/ext')
17 files changed, 210 insertions, 236 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 75b0dc1ec3..65cfe1fa90 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -152,7 +152,8 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, memset(d, 0, sizeof(*d)); d->start_ts = args->start_time; /* TODO(hongyu): call census_tracing_start_op here. */ - grpc_closure_init(&d->finish_recv, server_on_done_recv, elem); + grpc_closure_init(&d->finish_recv, server_on_done_recv, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c index 9797e66564..b10f444b63 100644 --- a/src/core/ext/client_channel/channel_connectivity.c +++ b/src/core/ext/client_channel/channel_connectivity.c @@ -198,7 +198,8 @@ void grpc_channel_watch_connectivity_state( grpc_cq_begin_op(cq, tag); gpr_mu_init(&w->mu); - grpc_closure_init(&w->on_complete, watch_complete, w); + grpc_closure_init(&w->on_complete, watch_complete, w, + grpc_schedule_on_exec_ctx); w->phase = WAITING; w->state = last_observed_state; w->cq = cq; diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 16de03d003..74350d9fee 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -254,7 +254,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, + grpc_schedule_on_exec_ctx); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, @@ -366,14 +367,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, } chand->method_params_table = method_params_table; if (lb_policy != NULL) { - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { grpc_closure_list_fail_all( &chand->waiting_for_config_closures, GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1)); - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -430,7 +429,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -449,9 +448,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_closure_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing")); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -470,8 +468,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - grpc_exec_ctx_enqueue_list(exec_ctx, - &chand->waiting_for_config_closures, NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -516,7 +513,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&chand->mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand); + on_resolver_result_changed, chand, + grpc_schedule_on_exec_ctx); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -683,8 +681,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched( + exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, @@ -766,14 +765,14 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { call_data *calld = cpa->elem->call_data; gpr_mu_lock(&calld->mu); if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready, GRPC_ERROR_NONE)) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } gpr_mu_unlock(&calld->mu); } @@ -805,9 +804,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } gpr_mu_unlock(&chand->mu); @@ -858,12 +857,12 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_init(&cpa->closure, continue_picking, cpa, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } gpr_mu_unlock(&chand->mu); @@ -948,7 +947,8 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem); + grpc_closure_init(&calld->next_step, subchannel_ready, elem, + grpc_schedule_on_exec_ctx); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1094,7 +1094,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem); + grpc_closure_init(&calld->read_service_config, read_service_config, elem, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); gpr_mu_unlock(&chand->mu); @@ -1207,7 +1208,8 @@ void grpc_client_channel_watch_connectivity_state( w->pollset = pollset; w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu); diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 5ead2ff73f..27b117af84 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -133,7 +133,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, handshaker->shutdown = true; } // Invoke callback. - grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); + grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error); } // Callback invoked when finished writing HTTP CONNECT request. @@ -232,7 +232,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, goto done; } // Success. Invoke handshake-done callback. - grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); + grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error); done: // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. @@ -316,9 +316,9 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { handshaker->proxy_server = gpr_strdup(proxy_server); grpc_slice_buffer_init(&handshaker->write_buffer); grpc_closure_init(&handshaker->request_done_closure, on_write_done, - handshaker); + handshaker, grpc_schedule_on_exec_ctx); grpc_closure_init(&handshaker->response_read_closure, on_read_done, - handshaker); + handshaker, grpc_schedule_on_exec_ctx); grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, &handshaker->http_response); return &handshaker->base; diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 4ac896923a..f8f48f2894 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -294,8 +294,9 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } } @@ -331,7 +332,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, c->args = grpc_channel_args_copy(args->args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - grpc_closure_init(&c->connected, subchannel_connected, c); + grpc_closure_init(&c->connected, subchannel_connected, c, + grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); int initial_backoff_ms = @@ -506,7 +508,8 @@ void grpc_subchannel_notify_on_state_change( w->subchannel = c; w->pollset_set = interested_parties; w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_closure_init(&w->closure, on_external_state_watcher_done, w, + grpc_schedule_on_exec_ctx); if (interested_parties != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); @@ -627,7 +630,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel); + sw_subchannel, grpc_schedule_on_exec_ctx); if (c->disconnected) { gpr_free(sw_subchannel); diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ea6c62480f..390c6ff817 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -181,8 +181,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wrapped_rr_closure_arg *wc_arg = arg; GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), - NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); if (wc_arg->rr_policy != NULL) { /* if *target is NULL, no pick has been made by the RR policy (eg, all @@ -249,7 +248,8 @@ static void add_pending_pick(pending_pick **root, pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.free_when_done = pp; grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, - wrapped_rr_closure, &pp->wrapped_on_complete_arg); + wrapped_rr_closure, &pp->wrapped_on_complete_arg, + grpc_schedule_on_exec_ctx); *root = pp; } @@ -269,7 +269,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, - wrapped_rr_closure, &pping->wrapped_notify_arg); + wrapped_rr_closure, &pping->wrapped_notify_arg, + grpc_schedule_on_exec_ctx); *root = pping; } @@ -673,7 +674,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity); + rr_connectivity, grpc_schedule_on_exec_ctx); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = new_rr_state; @@ -914,15 +915,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_NONE); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_NONE); pping = next; } } @@ -938,9 +939,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1)); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -963,9 +964,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1)); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -1000,11 +1001,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, on_complete, GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " - "won't work without it. Failing"), - NULL); + "won't work without it. Failing")); return 0; } @@ -1023,7 +1023,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); - grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg); + grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, + grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; wc_arg->wrapped_closure = on_complete; @@ -1122,9 +1123,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_grpclb_request_destroy(request); grpc_closure_init(&glb_policy->lb_on_server_status_received, - lb_on_server_status_received, glb_policy); + lb_on_server_status_received, glb_policy, + grpc_schedule_on_exec_ctx); grpc_closure_init(&glb_policy->lb_on_response_received, - lb_on_response_received, glb_policy); + lb_on_response_received, glb_policy, + grpc_schedule_on_exec_ctx); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index b9cfe6b5c0..821becff69 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -120,7 +120,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); pp = next; } @@ -138,9 +138,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1)); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -165,9 +165,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1)); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -306,14 +306,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_sched(exec_ctx, - grpc_closure_create(destroy_subchannels, p), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, + grpc_closure_create(destroy_subchannels, p, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -366,8 +367,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -419,8 +419,7 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), - NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected")); } } @@ -485,7 +484,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, p->num_subchannels = subchannel_idx; grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p, + grpc_schedule_on_exec_ctx); gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 896d13df3e..cb679489c3 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -321,8 +321,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Channel Shutdown"), NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Channel Shutdown")); gpr_free(pp); } grpc_connectivity_state_set( @@ -348,9 +348,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -376,9 +376,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { *pp->target = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -581,7 +581,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", (void *)selected->subchannel, (void *)selected); } - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } update_lb_connectivity_status(exec_ctx, sd, error); @@ -634,7 +634,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } @@ -684,8 +684,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_sched(exec_ctx, closure, - GRPC_ERROR_CREATE("Round Robin not connected"), NULL); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE("Round Robin not connected")); } } @@ -749,7 +749,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd); + rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx); } } if (subchannel_idx == 0) { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index aeb537eabd..8af6191c3b 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -105,7 +105,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, memset(calld, 0, sizeof(call_data)); calld->id = (intptr_t)args->call_stack; - grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem); + grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem, + grpc_schedule_on_exec_ctx); /* TODO(dgq): do something with the data channel_data *chand = elem->channel_data; diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 37a2d8dc30..124b16bbc3 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -112,8 +112,8 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_exec_ctx_sched(exec_ctx, r->next_completion, - GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); + grpc_closure_sched(exec_ctx, r->next_completion, + GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -219,9 +219,10 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!r->resolving); r->resolving = true; r->addresses = NULL; - grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, - r->interested_parties, - grpc_closure_create(dns_on_resolved, r), &r->addresses); + grpc_resolve_address( + exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, + grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + &r->addresses); } static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -231,7 +232,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, *r->target_result = r->resolved_result == NULL ? NULL : grpc_channel_args_copy(r->resolved_result); - grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 5fd1607cee..a1365f6465 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -90,7 +90,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -124,7 +124,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses); *r->target_result = grpc_channel_args_copy_and_add(r->channel_args, &arg, 1); - grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index d8afcfb626..bf4d797938 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -142,7 +142,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); c->handshake_mgr = NULL; gpr_mu_unlock(&c->mu); @@ -181,7 +181,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { @@ -204,7 +204,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); @@ -212,7 +212,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(c->endpoint != NULL); if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); + c, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&c->initial_string_buffer); grpc_slice_buffer_add(&c->initial_string_buffer, c->args.initial_connect_string); @@ -238,7 +238,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, c->result = result; GPR_ASSERT(c->endpoint == NULL); chttp2_connector_ref(con); // Ref taken for callback. - grpc_closure_init(&c->connected, connected, c); + grpc_closure_init(&c->connected, connected, c, grpc_schedule_on_exec_ctx); GPR_ASSERT(!c->connecting); c->connecting = true; grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index d573bbe73e..86f5a198c2 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -282,7 +282,8 @@ grpc_error *grpc_chttp2_server_add_port( state = gpr_malloc(sizeof(*state)); memset(state, 0, sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, state); + tcp_server_shutdown_complete, state, + grpc_schedule_on_exec_ctx); err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, args, &tcp_server); if (err != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6563b42a60..0f762dbceb 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -74,20 +74,14 @@ static const grpc_transport_vtable vtable; static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void write_action_end(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_error *error); -static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); @@ -113,12 +107,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -167,8 +157,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->pings.next != &t->pings) { grpc_chttp2_outstanding_ping *ping = t->pings.next; - grpc_exec_ctx_sched(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_closure_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed")); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -247,18 +237,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, - t); - grpc_closure_init(&t->write_action, write_action, t); - grpc_closure_init(&t->write_action_end, write_action_end, t); - grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); - grpc_closure_init(&t->read_action_begin, read_action_begin, t); - grpc_closure_init(&t->read_action_locked, read_action_locked, t); - grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); - grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); - grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); + grpc_closure_init(&t->write_action, write_action, t, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&t->read_action_locked, read_action_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t); + destructive_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -396,9 +383,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_combiner_execute(exec_ctx, t->combiner, - grpc_closure_create(destroy_transport_locked, t), - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, grpc_closure_create( + destroy_transport_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); } static void close_transport_locked(grpc_exec_ctx *exec_ctx, @@ -472,8 +460,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - grpc_closure_init(&s->complete_fetch, complete_fetch, s); - grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s); + grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, + grpc_schedule_on_exec_ctx); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -550,9 +538,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; s->destroy_stream_arg = and_free_memory; - grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s); - grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } @@ -603,7 +592,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, write_state_name(st), reason)); t->write_state = st; if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL); + grpc_closure_list_sched(exec_ctx, &t->run_after_write); if (t->close_transport_on_writes_finished != NULL) { grpc_error *err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = NULL; @@ -621,9 +610,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_WRITE_STATE_IDLE: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, covered_by_poller); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &t->write_action_begin_locked, write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state( @@ -665,7 +657,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing"); - grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); } else { set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); @@ -677,19 +669,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action", 0); - grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end); + grpc_endpoint_write( + exec_ctx, t->ep, &t->outbuf, + grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, + grpc_combiner_scheduler(t->combiner, false))); GPR_TIMER_END("write_action", 0); } -static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt, - grpc_error *error) { - grpc_chttp2_transport *t = gt; - GPR_TIMER_BEGIN("write_action_end", 0); - grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked, - GRPC_ERROR_REF(error), false); - GPR_TIMER_END("write_action_end", 0); -} - static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); @@ -719,18 +705,24 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [!covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, false); + grpc_closure_run( + exec_ctx, + grpc_closure_init( + &t->write_action_begin_locked, write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, true); + grpc_closure_run( + exec_ctx, + grpc_closure_init(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, true)), + GRPC_ERROR_NONE); break; } @@ -968,15 +960,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, } } -static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error) { - grpc_chttp2_stream *s = gs; - grpc_chttp2_transport *t = s->t; - grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked, - GRPC_ERROR_REF(error), - s->complete_fetch_covered_by_poller); -} - static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, @@ -1015,7 +998,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { - on_complete = grpc_closure_create(do_nothing, NULL); + on_complete = + grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx); } /* use final_data as a barrier until enqueue time; the inital counter is @@ -1218,13 +1202,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked, - op); op->transport_private.args[0] = gt; op->transport_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); - grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE, op->covered_by_poller); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &op->transport_private.closure, perform_stream_op_locked, op, + grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), + GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } @@ -1253,7 +1239,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_outstanding_ping *ping; for (ping = t->pings.next; ping != &t->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1327,11 +1313,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, char *msg = grpc_transport_op_string(op); gpr_free(msg); op->transport_private.args[0] = gt; - grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, - op); GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, grpc_closure_init(&op->transport_private.closure, + perform_transport_op_locked, op, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -1806,19 +1793,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) { * INPUT PROCESSING - PARSING */ -static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - /* Control flow: - reading_action_locked -> - (parse_unlocked -> post_parse_locked)? -> - post_reading_action_locked */ - GPR_TIMER_BEGIN("reading_action", 0); - grpc_chttp2_transport *t = tp; - grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked, - GRPC_ERROR_REF(error), false); - GPR_TIMER_END("reading_action", 0); -} - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1918,7 +1892,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin); + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, + &t->read_action_locked); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2055,10 +2030,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, bs->next_action.slice = slice; bs->next_action.max_size_hint = max_size_hint; bs->next_action.on_complete = on_complete; - grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->combiner, - &bs->next_action.closure, GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->next_action.closure, incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_next", 0); return 0; } @@ -2080,10 +2057,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->destroy_action, incoming_byte_stream_destroy_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } @@ -2091,7 +2070,7 @@ static void incoming_byte_stream_publish_error( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; @@ -2108,7 +2087,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); if (bs->on_next != NULL) { *bs->next = slice; - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE); bs->on_next = NULL; } else { grpc_slice_buffer_add(&bs->slices, slice); @@ -2176,7 +2155,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); grpc_resource_user_post_reclaimer(exec_ctx, grpc_endpoint_get_resource_user(t->ep), - false, &t->benign_reclaimer); + false, &t->benign_reclaimer_locked); } } @@ -2187,24 +2166,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); grpc_resource_user_post_reclaimer(exec_ctx, grpc_endpoint_get_resource_user(t->ep), - true, &t->destructive_reclaimer); + true, &t->destructive_reclaimer_locked); } } -static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked, - GRPC_ERROR_REF(error), false); -} - -static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked, - GRPC_ERROR_REF(error), false); -} - static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; @@ -2385,5 +2350,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } - read_action_begin(exec_ctx, t, GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 5dabe272d3..2a14167f67 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1692,10 +1692,11 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - grpc_combiner_execute_finally( - exec_ctx, t->combiner, - grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE, - false); + grpc_closure_sched( + exec_ctx, grpc_closure_create(force_client_rst_stream, s, + grpc_combiner_finally_scheduler( + t->combiner, false)), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b727965d43..a52acbacdb 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -212,10 +212,8 @@ struct grpc_chttp2_transport { grpc_closure write_action_begin_locked; grpc_closure write_action; - grpc_closure write_action_end; grpc_closure write_action_end_locked; - grpc_closure read_action_begin; grpc_closure read_action_locked; /** incoming read bytes */ @@ -336,10 +334,8 @@ struct grpc_chttp2_transport { /** have we scheduled a destructive cleanup? */ bool destructive_reclaimer_registered; /** benign cleanup closure */ - grpc_closure benign_reclaimer; grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ - grpc_closure destructive_reclaimer; grpc_closure destructive_reclaimer_locked; }; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index c45c653247..e37c541808 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -868,18 +868,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_initial_metadata_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -930,22 +930,22 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_CANCELLED); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_message_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -978,8 +978,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1013,8 +1013,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; /* Do an extra read to trigger on_succeeded() callback in case connection @@ -1075,18 +1075,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, - GRPC_ERROR_REF(stream_state->cancel_error), NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, + GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->on_complete, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; |