diff options
author | Craig Tiller <ctiller@google.com> | 2017-01-04 10:17:31 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-01-04 10:17:31 -0800 |
commit | 17f6904baded526404e1bb4c023ab1eb91054813 (patch) | |
tree | 09f4edd558cf7094a60fe1970c94377304f09fc5 /src/core/ext/client_channel/client_channel.c | |
parent | 00a2d18234269e901deb7da22063c7fda210d7ee (diff) | |
parent | 33dfad1fbc14f9e0157b31e3bc33eda1a0cd4e1c (diff) |
Merge branch 'slice_with_exec_ctx_and_new_closures' into metadata_filter_and_new_closures
Diffstat (limited to 'src/core/ext/client_channel/client_channel.c')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 16de03d003..74350d9fee 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -254,7 +254,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, + grpc_schedule_on_exec_ctx); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, @@ -366,14 +367,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, } chand->method_params_table = method_params_table; if (lb_policy != NULL) { - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { grpc_closure_list_fail_all( &chand->waiting_for_config_closures, GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1)); - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -430,7 +429,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -449,9 +448,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_closure_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing")); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -470,8 +468,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - grpc_exec_ctx_enqueue_list(exec_ctx, - &chand->waiting_for_config_closures, NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -516,7 +513,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&chand->mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand); + on_resolver_result_changed, chand, + grpc_schedule_on_exec_ctx); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -683,8 +681,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched( + exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, @@ -766,14 +765,14 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { call_data *calld = cpa->elem->call_data; gpr_mu_lock(&calld->mu); if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready, GRPC_ERROR_NONE)) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } gpr_mu_unlock(&calld->mu); } @@ -805,9 +804,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } gpr_mu_unlock(&chand->mu); @@ -858,12 +857,12 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_init(&cpa->closure, continue_picking, cpa, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } gpr_mu_unlock(&chand->mu); @@ -948,7 +947,8 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem); + grpc_closure_init(&calld->next_step, subchannel_ready, elem, + grpc_schedule_on_exec_ctx); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1094,7 +1094,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem); + grpc_closure_init(&calld->read_service_config, read_service_config, elem, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); gpr_mu_unlock(&chand->mu); @@ -1207,7 +1208,8 @@ void grpc_client_channel_watch_connectivity_state( w->pollset = pollset; w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu); |