diff options
Diffstat (limited to 'src/core/ext/filters')
24 files changed, 151 insertions, 166 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 2e99257cd9..c3dca14305 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -211,9 +211,9 @@ void grpc_channel_watch_connectivity_state( grpc_cq_begin_op(cq, tag); gpr_mu_init(&w->mu); - grpc_closure_init(&w->on_complete, watch_complete, w, + GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, grpc_schedule_on_exec_ctx); - grpc_closure_init(&w->on_timeout, timeout_complete, w, + GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w, grpc_schedule_on_exec_ctx); w->phase = WAITING; w->state = last_observed_state; @@ -225,7 +225,7 @@ void grpc_channel_watch_connectivity_state( watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg)); wa->w = w; wa->deadline = deadline; - grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa, + GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa, grpc_schedule_on_exec_ctx); if (client_channel_elem->filter == &grpc_client_channel_filter) { diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 3ec539884b..f29c5d55ed 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -275,7 +275,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; @@ -364,7 +364,7 @@ static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(wc_arg != NULL); GPR_ASSERT(wc_arg->wrapped_closure != NULL); GPR_ASSERT(wc_arg->lb_policy != NULL); - grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping"); gpr_free(wc_arg); } @@ -506,12 +506,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } chand->method_params_table = method_params_table; if (lb_policy != NULL) { - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Channel disconnected", &error, 1)); - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } if (!lb_policy_updated && lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { @@ -583,7 +583,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { @@ -604,7 +604,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -618,7 +618,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -634,9 +634,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, - grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, + GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -677,7 +677,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; - grpc_closure_init(&chand->on_resolver_result_changed, + GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); @@ -737,8 +737,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_closure_sched( - exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver, + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -1038,13 +1038,13 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->subchannel_call_context, cpa->on_ready)) { - grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } gpr_free(cpa); @@ -1064,7 +1064,7 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, continue_picking_args *cpa = closure->cb_arg; if (cpa->connected_subchannel == &calld->connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); } @@ -1113,7 +1113,7 @@ static bool pick_subchannel_locked( // the LB policy for the duration of the pick. wrapped_on_pick_closure_arg *w_on_pick_arg = gpr_zalloc(sizeof(*w_on_pick_arg)); - grpc_closure_init(&w_on_pick_arg->wrapper_closure, + GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure, wrapped_on_pick_closure_cb, w_on_pick_arg, grpc_schedule_on_exec_ctx); w_on_pick_arg->wrapped_closure = on_ready; @@ -1147,12 +1147,12 @@ static bool pick_subchannel_locked( cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa, grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_closure_sched(exec_ctx, on_ready, + GRPC_CLOSURE_SCHED(exec_ctx, on_ready, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } @@ -1202,7 +1202,7 @@ static void start_transport_stream_op_batch_locked_inner( if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->pick_pending = true; - grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem, grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from @@ -1275,7 +1275,7 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { calld->retry_throttle_data); } } - grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete, GRPC_ERROR_REF(error)); } @@ -1291,7 +1291,7 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, if (op->recv_trailing_metadata) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete, elem, + GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } @@ -1340,8 +1340,8 @@ static void cc_start_transport_stream_op_batch( /* we failed; lock and figure out what to do */ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; - grpc_closure_sched( - exec_ctx, grpc_closure_init(&op->handler_private.closure, + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_stream_op_batch_locked, op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); @@ -1402,7 +1402,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, } } gpr_free(calld->waiting_ops); - grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -1456,8 +1456,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(try_to_connect_locked, chand, + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -1547,7 +1547,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1556,8 +1556,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, external_connectivity_watcher *found = NULL; if (w->state != NULL) { external_connectivity_watcher_list_append(w->chand, w); - grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w, grpc_schedule_on_exec_ctx); grpc_connectivity_state_notify_on_state_change( exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); @@ -1592,9 +1592,9 @@ void grpc_client_channel_watch_connectivity_state( chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, - grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, + GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c index 5e4bfe74d8..0952dc6d4e 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.c +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c @@ -118,7 +118,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, handshaker->shutdown = true; } // Invoke callback. - grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, handshaker->on_handshake_done, error); } // Callback invoked when finished writing HTTP CONNECT request. @@ -217,7 +217,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, goto done; } // Success. Invoke handshake-done callback. - grpc_closure_sched(exec_ctx, handshaker->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, handshaker->on_handshake_done, error); done: // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. @@ -266,7 +266,7 @@ static void http_connect_handshaker_do_handshake( gpr_mu_lock(&handshaker->mu); handshaker->shutdown = true; gpr_mu_unlock(&handshaker->mu); - grpc_closure_sched(exec_ctx, on_handshake_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, on_handshake_done, GRPC_ERROR_NONE); return; } GPR_ASSERT(arg->type == GRPC_ARG_STRING); @@ -339,9 +339,9 @@ static grpc_handshaker* grpc_http_connect_handshaker_create() { gpr_mu_init(&handshaker->mu); gpr_ref_init(&handshaker->refcount, 1); grpc_slice_buffer_init(&handshaker->write_buffer); - grpc_closure_init(&handshaker->request_done_closure, on_write_done, + GRPC_CLOSURE_INIT(&handshaker->request_done_closure, on_write_done, handshaker, grpc_schedule_on_exec_ctx); - grpc_closure_init(&handshaker->response_read_closure, on_read_done, + GRPC_CLOSURE_INIT(&handshaker->response_read_closure, on_read_done, handshaker, grpc_schedule_on_exec_ctx); grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, &handshaker->http_response); diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 0c8e179925..50f8faef8e 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -74,7 +74,7 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - grpc_closure_sched(exec_ctx, grpc_closure_create( + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE( shutdown_locked, policy, grpc_combiner_scheduler(policy->combiner)), GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 10e59a99de..52c6e38c87 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -53,7 +53,7 @@ static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { calld->send_initial_metadata_succeeded = true; } - grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); } @@ -63,7 +63,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { calld->recv_initial_metadata_succeeded = true; } - grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -104,7 +104,7 @@ static void start_transport_stream_op_batch( // Intercept send_initial_metadata. if (batch->send_initial_metadata) { calld->original_on_complete_for_send = batch->on_complete; - grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld, grpc_schedule_on_exec_ctx); batch->on_complete = &calld->on_complete_for_send; } @@ -112,7 +112,7 @@ static void start_transport_stream_op_batch( if (batch->recv_initial_metadata) { calld->original_recv_initial_metadata_ready = batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - grpc_closure_init(&calld->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, calld, grpc_schedule_on_exec_ctx); batch->payload->recv_initial_metadata.recv_initial_metadata_ready = diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index d3182f35fe..8d3c1c8e72 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -184,7 +184,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wrapped_rr_closure_arg *wc_arg = arg; GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); if (wc_arg->rr_policy != NULL) { /* if *target is NULL, no pick has been made by the RR policy (eg, all @@ -256,7 +256,7 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.free_when_done = pp; - grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure, wrapped_rr_closure, &pp->wrapped_on_complete_arg, grpc_schedule_on_exec_ctx); *root = pp; @@ -275,7 +275,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pping->wrapped_notify_arg.wrapped_closure = notify; pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; - grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, + GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure, wrapped_rr_closure, &pping->wrapped_notify_arg, grpc_schedule_on_exec_ctx); *root = pping; @@ -635,7 +635,7 @@ static bool pick_from_internal_rr_locked( grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -663,7 +663,7 @@ static bool pick_from_internal_rr_locked( wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -739,7 +739,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = gpr_zalloc(sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, + GRPC_CLOSURE_INIT(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner)); rr_connectivity->glb_policy = glb_policy; @@ -1004,7 +1004,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } - grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed, + GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); @@ -1078,14 +1078,14 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_NONE); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, GRPC_ERROR_NONE); pping = next; } @@ -1101,7 +1101,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1125,7 +1125,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1160,7 +1160,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; - grpc_closure_sched(exec_ctx, on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No mdelem storage for the LB token. Load reporting " "won't work without it. Failing")); @@ -1179,7 +1179,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg)); - grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, + GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; @@ -1250,7 +1250,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec next_client_load_report_time = gpr_time_add(now, glb_policy->client_stats_report_interval); - grpc_closure_init(&glb_policy->client_load_report_closure, + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, @@ -1278,7 +1278,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = glb_policy->client_load_report_payload; - grpc_closure_init(&glb_policy->client_load_report_closure, + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, client_load_report_done_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( @@ -1384,13 +1384,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); - grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, lb_on_sent_initial_request_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_closure_init(&glb_policy->lb_on_server_status_received, + GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_closure_init(&glb_policy->lb_on_response_received, + GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1693,7 +1693,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init(&glb_policy->lb_on_call_retry, + GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 0e2b4131f1..307e3bad67 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -118,7 +118,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); pp = next; } @@ -135,7 +135,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -160,7 +160,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -258,7 +258,7 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (p->selected) { grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); } else { - grpc_closure_sched(exec_ctx, closure, + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } @@ -557,7 +557,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, "Servicing pending pick with selected subchannel %p", (void *)p->selected); } - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -610,7 +610,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -654,7 +654,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, + GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 9ecb0a39da..3c8520cc1c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -288,7 +288,7 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); gpr_free(pp); @@ -311,7 +311,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -336,7 +336,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -553,7 +553,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } @@ -614,7 +614,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, (void *)selected->subchannel, (unsigned long)next_ready_index); } - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } @@ -655,7 +655,7 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); } } @@ -747,7 +747,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; sd->subchannel_list = subchannel_list; sd->subchannel = subchannel; - grpc_closure_init(&sd->connectivity_changed_closure, + GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, grpc_combiner_scheduler(args->combiner)); /* use some sentinel value outside of the range of diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index 92f060b422..04a7852323 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -116,7 +116,7 @@ static void dns_ares_shutdown_locked(grpc_exec_ctx *exec_ctx, } if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = NULL; @@ -221,7 +221,7 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, ? NULL : grpc_channel_args_copy(r->resolved_result); gpr_log(GPR_DEBUG, "dns_ares_maybe_finish_next_locked"); - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; r->published_version = r->resolved_version; } @@ -266,10 +266,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, GRPC_DNS_RECONNECT_JITTER, GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - grpc_closure_init(&r->dns_ares_on_retry_timer_locked, + GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); - grpc_closure_init(&r->dns_ares_on_resolved_locked, + GRPC_CLOSURE_INIT(&r->dns_ares_on_resolved_locked, dns_ares_on_resolved_locked, r, grpc_combiner_scheduler(r->base.combiner)); return &r->base; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 293e6b0252..4e79c44ba3 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -257,9 +257,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, fdn->readable_registered = false; fdn->writable_registered = false; gpr_mu_init(&fdn->mu); - grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn, + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn, + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 153287c30e..244b260dfa 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -107,10 +107,10 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, acquire locks in on_done. ares_dns_resolver is using combiner to protect resources needed by on_done. */ grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_closure_sched(&new_exec_ctx, r->on_done, r->error); + GRPC_CLOSURE_SCHED(&new_exec_ctx, r->on_done, r->error); grpc_exec_ctx_finish(&new_exec_ctx); } else { - grpc_closure_sched(exec_ctx, r->on_done, r->error); + GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, r->error); } gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); @@ -370,7 +370,7 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( return r; error_cleanup: - grpc_closure_sched(exec_ctx, on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, error); gpr_free(host); gpr_free(port); return NULL; @@ -445,7 +445,7 @@ static void on_dns_lookup_done_cb(grpc_exec_ctx *exec_ctx, void *arg, &r->lb_addrs->addresses[i].address, sizeof(grpc_resolved_address)); } } - grpc_closure_sched(exec_ctx, r->on_resolve_address_done, + GRPC_CLOSURE_SCHED(exec_ctx, r->on_resolve_address_done, GRPC_ERROR_REF(error)); grpc_lb_addresses_destroy(exec_ctx, r->lb_addrs); gpr_free(r); @@ -461,7 +461,7 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); r->addrs_out = addrs; r->on_resolve_address_done = on_done; - grpc_closure_init(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, + GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, grpc_schedule_on_exec_ctx); grpc_dns_lookup_ares(exec_ctx, NULL /* dns_server */, name, default_port, interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c index 5c15e3bdf5..b67636a3e4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2016-2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index 6c263c32f9..af3391a731 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -99,7 +99,7 @@ static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, } if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = NULL; @@ -178,7 +178,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + GRPC_CLOSURE_INIT(&r->on_retry, dns_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } @@ -200,7 +200,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved_locked, r, + GRPC_CLOSURE_CREATE(dns_on_resolved_locked, r, grpc_combiner_scheduler(r->base.combiner)), &r->addresses); } @@ -212,7 +212,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, *r->target_result = r->resolved_result == NULL ? NULL : grpc_channel_args_copy(r->resolved_result); - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index cbb0e628ed..8e73d606aa 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -74,7 +74,7 @@ static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx, fake_resolver* r = (fake_resolver*)resolver; if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } } @@ -85,7 +85,7 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, *r->target_result = grpc_channel_args_union(r->next_results, r->channel_args); grpc_channel_args_destroy(exec_ctx, r->next_results); - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; r->next_results = NULL; } @@ -157,8 +157,8 @@ void grpc_fake_resolver_response_generator_set_response( grpc_channel_args* next_response) { GPR_ASSERT(generator->resolver != NULL); generator->next_response = grpc_channel_args_copy(next_response); - grpc_closure_sched( - exec_ctx, grpc_closure_create(set_response_cb, generator, + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_CREATE(set_response_cb, generator, grpc_combiner_scheduler( generator->resolver->base.combiner)), GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c index 641c8d3afe..b5d2e5c92b 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -73,7 +73,7 @@ static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r = (sockaddr_resolver *)resolver; if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } } @@ -103,7 +103,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses); *r->target_result = grpc_channel_args_copy_and_add(r->channel_args, &arg, 1); - grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } } diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 68a8bb8303..37dd96726e 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -283,7 +283,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_closure_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c, + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -333,7 +333,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - grpc_closure_init(&c->connected, subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -421,7 +421,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); - grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -488,7 +488,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", time_til_next.tv_sec, time_til_next.tv_nsec); } - grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now); } } @@ -514,7 +514,7 @@ void grpc_subchannel_notify_on_state_change( w->subchannel = c; w->pollset_set = interested_parties; w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w, + GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, grpc_schedule_on_exec_ctx); if (interested_parties != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, @@ -635,7 +635,7 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, + GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel, grpc_schedule_on_exec_ctx); if (c->disconnected) { diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c index c02756a726..ced025e2e2 100644 --- a/src/core/ext/filters/deadline/deadline_filter.c +++ b/src/core/ext/filters/deadline/deadline_filter.c @@ -74,7 +74,7 @@ retry: // If we've already created and destroyed a timer, we always create a // new closure: we have no other guarantee that the inlined closure is // not in use (it may hold a pending call to timer_callback) - closure = grpc_closure_create(timer_callback, elem, + closure = GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx); } else { goto retry; @@ -85,7 +85,7 @@ retry: GRPC_DEADLINE_STATE_INITIAL, GRPC_DEADLINE_STATE_PENDING)) { closure = - grpc_closure_init(&deadline_state->timer_callback, timer_callback, + GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback, elem, grpc_schedule_on_exec_ctx); } else { goto retry; @@ -115,7 +115,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - grpc_closure_run(exec_ctx, deadline_state->next_on_complete, + GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete, GRPC_ERROR_REF(error)); } @@ -123,7 +123,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { static void inject_on_complete_cb(grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) { deadline_state->next_on_complete = op->on_complete; - grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, + GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state, grpc_schedule_on_exec_ctx); op->on_complete = &deadline_state->on_complete; } @@ -161,9 +161,9 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); state->elem = elem; state->deadline = deadline; - grpc_closure_init(&state->closure, start_timer_after_init, state, + GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &state->closure, GRPC_ERROR_NONE); } } @@ -281,7 +281,7 @@ static void server_start_transport_stream_op_batch( op->payload->recv_initial_metadata.recv_initial_metadata_ready; calld->recv_initial_metadata = op->payload->recv_initial_metadata.recv_initial_metadata; - grpc_closure_init(&calld->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); op->payload->recv_initial_metadata.recv_initial_metadata_ready = diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index fb2a5d10fe..90f0aed7a0 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -158,7 +158,7 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); } static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, @@ -171,7 +171,7 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error); } static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, @@ -445,17 +445,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->payload_bytes = NULL; calld->send_message_blocked = false; grpc_slice_buffer_init(&calld->slices); - grpc_closure_init(&calld->hc_on_recv_initial_metadata, + GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata, hc_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hc_on_recv_trailing_metadata, + GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata, hc_on_recv_trailing_metadata, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem, + GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->got_slice, got_slice, elem, + GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->send_done, send_done, elem, + GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 4f753cef1c..04cb1d94f8 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -364,9 +364,9 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); - grpc_closure_init(&calld->got_slice, got_slice, elem, + GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->send_done, send_done, elem, + GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index 113e07d249..b145f12aff 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -269,7 +269,7 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } else { GRPC_ERROR_REF(err); } - grpc_closure_run(exec_ctx, calld->on_done_recv, err); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv, err); } static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, @@ -281,11 +281,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } - grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err)); } static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, @@ -296,7 +296,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, /* do nothing. This is probably a GET request, and payload will be returned in hs_on_complete callback. */ } else { - grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } @@ -383,11 +383,11 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem, + GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem, + GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, + GRPC_CLOSURE_INIT(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c index 80446ca914..08474efb2e 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.c +++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c @@ -90,7 +90,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; calld->id = (intptr_t)args->call_stack; - grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem, + GRPC_CLOSURE_INIT(&calld->on_initial_md_ready, on_initial_md_ready, elem, grpc_schedule_on_exec_ctx); /* TODO(dgq): do something with the data diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 604f74e751..35304f8150 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -329,23 +329,23 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, : gpr_time_from_millis(value, GPR_TIMESPAN); } } - grpc_closure_init(&chand->close_max_idle_channel, close_max_idle_channel, + GRPC_CLOSURE_INIT(&chand->close_max_idle_channel, close_max_idle_channel, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand, + GRPC_CLOSURE_INIT(&chand->close_max_age_channel, close_max_age_channel, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->force_close_max_age_channel, + GRPC_CLOSURE_INIT(&chand->force_close_max_age_channel, force_close_max_age_channel, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->start_max_idle_timer_after_init, + GRPC_CLOSURE_INIT(&chand->start_max_idle_timer_after_init, start_max_idle_timer_after_init, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->start_max_age_timer_after_init, + GRPC_CLOSURE_INIT(&chand->start_max_age_timer_after_init, start_max_age_timer_after_init, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op, + GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, start_max_age_grace_timer_after_goaway_op, chand, grpc_schedule_on_exec_ctx); - grpc_closure_init(&chand->channel_connectivity_changed, + GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed, channel_connectivity_changed, chand, grpc_schedule_on_exec_ctx); @@ -360,7 +360,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, initialization is done. */ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_age_timer_after_init"); - grpc_closure_sched(exec_ctx, &chand->start_max_age_timer_after_init, + GRPC_CLOSURE_SCHED(exec_ctx, &chand->start_max_age_timer_after_init, GRPC_ERROR_NONE); } @@ -371,7 +371,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, 0) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_idle_timer_after_init"); - grpc_closure_sched(exec_ctx, &chand->start_max_idle_timer_after_init, + GRPC_CLOSURE_SCHED(exec_ctx, &chand->start_max_idle_timer_after_init, GRPC_ERROR_NONE); } return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index e68ba149f2..9bb565ed6d 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -110,7 +110,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, GRPC_ERROR_REF(error); } // Invoke the next callback. - grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->next_recv_message_ready, error); } // Start transport stream op. @@ -152,7 +152,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; - grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, + GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index 9095d6167c..8b3fff5fa3 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -67,7 +67,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, } // Invoke the next callback. - grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, + GRPC_CLOSURE_RUN(exec_ctx, calld->next_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -106,7 +106,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, call_data* calld = elem->call_data; calld->next_recv_initial_metadata_ready = NULL; calld->workaround_active = false; - grpc_closure_init(&calld->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; |