diff options
Diffstat (limited to 'src/core/ext/client_channel/client_channel.c')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 9d46338428..2f25fef9a7 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -83,8 +83,12 @@ static void *method_parameters_copy(void *value) { return new_value; } +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { + gpr_free(p); +} + static const grpc_mdstr_hash_table_vtable method_parameters_vtable = { - gpr_free, method_parameters_copy}; + method_parameters_free, method_parameters_copy}; static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; @@ -249,7 +253,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, + grpc_schedule_on_exec_ctx); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, @@ -327,7 +332,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_service_config_create(service_config_json); if (service_config != NULL) { method_params_table = grpc_service_config_create_method_config_table( - service_config, method_parameters_create_from_json, + exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); grpc_service_config_destroy(service_config); } @@ -336,7 +341,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, // be pointing to data inside chand->resolver_result. // The copy will be saved in chand->lb_policy_name below. lb_policy_name = gpr_strdup(lb_policy_name); - grpc_channel_args_destroy(chand->resolver_result); + grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } @@ -357,18 +362,16 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->service_config_json = service_config_json; } if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } chand->method_params_table = method_params_table; if (lb_policy != NULL) { - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { grpc_closure_list_fail_all( &chand->waiting_for_config_closures, GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1)); - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -425,7 +428,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -444,9 +447,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_closure_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing")); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -465,8 +467,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - grpc_exec_ctx_enqueue_list(exec_ctx, - &chand->waiting_for_config_closures, NULL); + grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -511,7 +512,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&chand->mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand); + on_resolver_result_changed, chand, + grpc_schedule_on_exec_ctx); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -556,7 +558,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, gpr_free(chand->lb_policy_name); gpr_free(chand->service_config_json); if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -678,8 +680,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched( + exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, @@ -761,14 +764,14 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { call_data *calld = cpa->elem->call_data; gpr_mu_lock(&calld->mu); if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready, GRPC_ERROR_NONE)) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } gpr_mu_unlock(&calld->mu); } @@ -800,9 +803,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } gpr_mu_unlock(&chand->mu); @@ -853,12 +856,12 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_init(&cpa->closure, continue_picking, cpa, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } gpr_mu_unlock(&chand->mu); @@ -943,7 +946,8 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem); + grpc_closure_init(&calld->next_step, subchannel_ready, elem, + grpc_schedule_on_exec_ctx); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1001,8 +1005,8 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { - const method_parameters *method_params = - grpc_method_config_table_get(method_params_table, calld->path); + const method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { const bool have_method_timeout = gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; @@ -1025,7 +1029,7 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu); } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); @@ -1066,8 +1070,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_mdstr_hash_table *method_params_table = grpc_mdstr_hash_table_ref(chand->method_params_table); gpr_mu_unlock(&chand->mu); - method_parameters *method_params = - grpc_method_config_table_get(method_params_table, args->path); + method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, args->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1080,7 +1084,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, method_params->wait_for_ready; } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } else { gpr_mu_unlock(&chand->mu); } @@ -1089,7 +1093,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem); + grpc_closure_init(&calld->read_service_config, read_service_config, elem, + grpc_schedule_on_exec_ctx); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); gpr_mu_unlock(&chand->mu); @@ -1108,7 +1113,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, void *and_free_memory) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); - GRPC_MDSTR_UNREF(calld->path); + GRPC_MDSTR_UNREF(exec_ctx, calld->path); GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { @@ -1202,7 +1207,8 @@ void grpc_client_channel_watch_connectivity_state( w->pollset = pollset; w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu); |