diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 409 |
1 files changed, 172 insertions, 237 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index aced9adf9f..ba82c88eb7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -91,8 +91,7 @@ static void method_parameters_unref(method_parameters* method_params) { static void* method_parameters_ref_wrapper(void* value) { return method_parameters_ref((method_parameters*)value); } -static void method_parameters_unref_wrapper(grpc_exec_ctx* exec_ctx, - void* value) { +static void method_parameters_unref_wrapper(void* value) { method_parameters_unref((method_parameters*)value); } @@ -228,12 +227,11 @@ typedef struct { grpc_lb_policy* lb_policy; } lb_policy_connectivity_watcher; -static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, +static void watch_lb_policy_locked(channel_data* chand, grpc_lb_policy* lb_policy, grpc_connectivity_state current_state); -static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, - channel_data* chand, +static void set_channel_connectivity_state_locked(channel_data* chand, grpc_connectivity_state state, grpc_error* error, const char* reason) { @@ -245,12 +243,12 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* cancel picks with wait_for_ready=false */ grpc_lb_policy_cancel_picks_locked( - exec_ctx, chand->lb_policy, + chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } else if (state == GRPC_CHANNEL_SHUTDOWN) { /* cancel all picks */ - grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy, + grpc_lb_policy_cancel_picks_locked(chand->lb_policy, /* mask= */ 0, /* check= */ 0, GRPC_ERROR_REF(error)); } @@ -259,12 +257,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, grpc_connectivity_state_name(state)); } - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, - reason); + grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); } -static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { @@ -272,17 +268,17 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); } - set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state, + set_channel_connectivity_state_locked(w->chand, w->state, GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); + watch_lb_policy_locked(w->chand, w->lb_policy, w->state); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); + GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } -static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, +static void watch_lb_policy_locked(channel_data* chand, grpc_lb_policy* lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher* w = @@ -293,19 +289,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, + grpc_lb_policy_notify_on_state_change_locked(lb_policy, &w->state, &w->on_changed); } -static void start_resolving_locked(grpc_exec_ctx* exec_ctx, - channel_data* chand) { +static void start_resolving_locked(channel_data* chand) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); } GPR_ASSERT(!chand->started_resolving); chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, + grpc_resolver_next_locked(chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } @@ -369,29 +364,26 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) { } } -static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void request_reresolution_locked(void* arg, grpc_error* error) { reresolution_request_args* args = (reresolution_request_args*)arg; channel_data* chand = args->chand; // If this invocation is for a stale LB policy, treat it as an LB shutdown // signal. if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { - GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution"); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); gpr_free(args); return; } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand); } - grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver); + grpc_resolver_channel_saw_error_locked(chand->resolver); // Give back the closure to the LB policy. - grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy, - &args->closure); + grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure); } -static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = (channel_data*)arg; if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, @@ -458,12 +450,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, if (chand->lb_policy != nullptr && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. lb_policy_updated = true; - grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, - &lb_policy_args); + grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args); } else { // Instantiate new LB policy. - new_lb_policy = - grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); + new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args); if (new_lb_policy == nullptr) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); @@ -475,7 +465,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, grpc_combiner_scheduler(chand->combiner)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy, + grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy, &args->closure); } } @@ -492,8 +482,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != nullptr); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri* uri = - grpc_uri_parse(exec_ctx, channel_arg->value.string, true); + grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); @@ -504,7 +493,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, grpc_uri_destroy(uri); retry_throttle_data = parsing_state.retry_throttle_data; method_params_table = grpc_service_config_create_method_config_table( - exec_ctx, service_config, method_parameters_create_from_json, + service_config, method_parameters_create_from_json, method_parameters_ref_wrapper, method_parameters_unref_wrapper); grpc_service_config_destroy(service_config); } @@ -514,7 +503,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, // The copy will be saved in chand->lb_policy_name below. lb_policy_name_dup = gpr_strdup(lb_policy_name); } - grpc_channel_args_destroy(exec_ctx, chand->resolver_result); + grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; } if (grpc_client_channel_trace.enabled()) { @@ -546,7 +535,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, chand->retry_throttle_data = retry_throttle_data; // Swap out the method params table. if (chand->method_params_table != nullptr) { - grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); + grpc_slice_hash_table_unref(chand->method_params_table); } chand->method_params_table = method_params_table; // If we have a new LB policy or are shutting down (in which case @@ -562,10 +551,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, chand->lb_policy); } - grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } chand->lb_policy = new_lb_policy; } @@ -579,21 +567,20 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); } - grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_resolver_shutdown_locked(chand->resolver); + GRPC_RESOLVER_UNREF(chand->resolver, "channel"); chand->resolver = nullptr; } set_channel_connectivity_state_locked( - exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, + chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Got resolver result after disconnection", &error, 1), "resolver_gone"); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, - &chand->waiting_for_resolver_result_closures); + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* state_error = @@ -603,33 +590,28 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); } GRPC_ERROR_UNREF(state_error); - state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, - &state_error); - grpc_pollset_set_add_pollset_set(exec_ctx, - new_lb_policy->interested_parties, + state = + grpc_lb_policy_check_connectivity_locked(new_lb_policy, &state_error); + grpc_pollset_set_add_pollset_set(new_lb_policy->interested_parties, chand->interested_parties); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, - &chand->waiting_for_resolver_result_closures); + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); if (chand->exit_idle_when_lb_policy_arrives) { - grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy); + grpc_lb_policy_exit_idle_locked(new_lb_policy); chand->exit_idle_when_lb_policy_arrives = false; } - watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state); + watch_lb_policy_locked(chand, new_lb_policy, state); } if (!lb_policy_updated) { - set_channel_connectivity_state_locked(exec_ctx, chand, state, - GRPC_ERROR_REF(state_error), - "new_lb+resolver"); + set_channel_connectivity_state_locked( + chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); } - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, + grpc_resolver_next_locked(chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); GRPC_ERROR_UNREF(state_error); } } -static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error_ignored) { +static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { grpc_transport_op* op = (grpc_transport_op*)arg; grpc_channel_element* elem = (grpc_channel_element*)op->handler_private.extra_arg; @@ -637,7 +619,7 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, if (op->on_connectivity_state_change != nullptr) { grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, op->connectivity_state, + &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; @@ -645,11 +627,10 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, if (op->send_ping != nullptr) { if (chand->lb_policy == nullptr) { - GRPC_CLOSURE_SCHED( - exec_ctx, op->send_ping, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); + GRPC_CLOSURE_SCHED(op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Ping with no load balancing")); } else { - grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); + grpc_lb_policy_ping_one_locked(chand->lb_policy, op->send_ping); op->bind_pollset = nullptr; } op->send_ping = nullptr; @@ -658,54 +639,48 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (chand->resolver != nullptr) { set_channel_connectivity_state_locked( - exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, + chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_resolver_shutdown_locked(chand->resolver); + GRPC_RESOLVER_UNREF(chand->resolver, "channel"); chand->resolver = nullptr; if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, - &chand->waiting_for_resolver_result_closures); + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = nullptr; } } GRPC_ERROR_UNREF(op->disconnect_with_error); } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); - GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } -static void cc_start_transport_op(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static void cc_start_transport_op(grpc_channel_element* elem, grpc_transport_op* op) { channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != nullptr) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); + grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); } op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } -static void cc_get_channel_info(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static void cc_get_channel_info(grpc_channel_element* elem, const grpc_channel_info* info) { channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_lock(&chand->info_mu); @@ -724,8 +699,7 @@ static void cc_get_channel_info(grpc_exec_ctx* exec_ctx, } /* Constructor for channel_data */ -static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_last); @@ -746,7 +720,7 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties); + grpc_client_channel_start_backup_polling(chand->interested_parties); // Record client channel factory. const grpc_arg* arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); @@ -774,15 +748,15 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, } char* proxy_name = nullptr; grpc_channel_args* new_args = nullptr; - grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args, + grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); // Instantiate resolver. chand->resolver = grpc_resolver_create( - exec_ctx, proxy_name != nullptr ? proxy_name : arg->value.string, + proxy_name != nullptr ? proxy_name : arg->value.string, new_args != nullptr ? new_args : args->channel_args, chand->interested_parties, chand->combiner); if (proxy_name != nullptr) gpr_free(proxy_name); - if (new_args != nullptr) grpc_channel_args_destroy(exec_ctx, new_args); + if (new_args != nullptr) grpc_channel_args_destroy(new_args); if (chand->resolver == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); } @@ -791,32 +765,28 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, return GRPC_ERROR_NONE; } -static void shutdown_resolver_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void shutdown_resolver_locked(void* arg, grpc_error* error) { grpc_resolver* resolver = (grpc_resolver*)arg; - grpc_resolver_shutdown_locked(exec_ctx, resolver); - GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); + grpc_resolver_shutdown_locked(resolver); + GRPC_RESOLVER_UNREF(resolver, "channel"); } /* Destructor for channel_data */ -static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { +static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; if (chand->resolver != nullptr) { GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != nullptr) { - grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + grpc_client_channel_factory_unref(chand->client_channel_factory); } if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); @@ -824,12 +794,12 @@ static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); } if (chand->method_params_table != nullptr) { - grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); + grpc_slice_hash_table_unref(chand->method_params_table); } - grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties); - grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); - GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); + grpc_client_channel_stop_backup_polling(chand->interested_parties); + grpc_connectivity_state_destroy(&chand->state_tracker); + grpc_pollset_set_destroy(chand->interested_parties); + GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } @@ -916,21 +886,18 @@ static void waiting_for_pick_batches_add( } // This is called via the call combiner, so access to calld is synchronized. -static void fail_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count], GRPC_ERROR_REF(error), calld->call_combiner); } } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void waiting_for_pick_batches_fail(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; if (grpc_client_channel_trace.enabled()) { @@ -943,37 +910,34 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], fail_pending_batch_in_call_combiner, calld, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_REF(error), - "waiting_for_pick_batches_fail"); + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i], + GRPC_ERROR_REF(error), "waiting_for_pick_batches_fail"); } if (calld->initial_metadata_batch != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error), + calld->initial_metadata_batch, GRPC_ERROR_REF(error), calld->call_combiner); } else { - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "waiting_for_pick_batches_fail"); } GRPC_ERROR_UNREF(error); } // This is called via the call combiner, so access to calld is synchronized. -static void run_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* ignored) { +static void run_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_subchannel_call_process_op( - exec_ctx, calld->subchannel_call, + calld->subchannel_call, calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]); } } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void waiting_for_pick_batches_resume(grpc_call_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; if (grpc_client_channel_trace.enabled()) { @@ -987,20 +951,18 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], run_pending_batch_in_call_combiner, calld, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_NONE, - "waiting_for_pick_batches_resume"); + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i], + GRPC_ERROR_NONE, "waiting_for_pick_batches_resume"); } GPR_ASSERT(calld->initial_metadata_batch != nullptr); - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, + grpc_subchannel_call_process_op(calld->subchannel_call, calld->initial_metadata_batch); } // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. -static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void apply_service_config_to_call_locked(grpc_call_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; if (grpc_client_channel_trace.enabled()) { @@ -1013,7 +975,7 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx, } if (chand->method_params_table != nullptr) { calld->method_params = (method_parameters*)grpc_method_config_table_get( - exec_ctx, chand->method_params_table, calld->path); + chand->method_params_table, calld->path); if (calld->method_params != nullptr) { method_parameters_ref(calld->method_params); // If the deadline from the service config is shorter than the one @@ -1025,15 +987,14 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx, calld->method_params->timeout; if (per_method_deadline < calld->deadline) { calld->deadline = per_method_deadline; - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + grpc_deadline_state_reset(elem, calld->deadline); } } } } } -static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; @@ -1047,24 +1008,22 @@ static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx, calld->call_combiner // call_combiner }; grpc_error* new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, &call_args, - &calld->subchannel_call); + calld->connected_subchannel, &call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); } if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail(exec_ctx, elem, new_error); + waiting_for_pick_batches_fail(elem, new_error); } else { - waiting_for_pick_batches_resume(exec_ctx, elem); + waiting_for_pick_batches_resume(elem); } GRPC_ERROR_UNREF(error); } // Invoked when a pick is completed, on both success or failure. -static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_error* error) { +static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (calld->connected_subchannel == nullptr) { @@ -1080,10 +1039,10 @@ static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, "chand=%p calld=%p: failed to create subchannel: error=%s", chand, calld, grpc_error_string(calld->error)); } - waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error)); + waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error)); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + create_subchannel_call_locked(elem, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -1092,19 +1051,17 @@ static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, // either (a) the pick was deferred pending a resolver result or (b) the // pick was done asynchronously. Removes the call's polling entity from // chand->interested_parties before invoking pick_done_locked(). -static void async_pick_done_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, grpc_error* error) { +static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, + grpc_polling_entity_del_from_pollset_set(calld->pollent, chand->interested_parties); - pick_done_locked(exec_ctx, elem, error); + pick_done_locked(elem, error); } // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void pick_callback_cancel_locked(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; @@ -1113,17 +1070,15 @@ static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg, gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", chand, calld, calld->lb_policy); } - grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked( + calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error)); } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel"); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy and invokes async_pick_done_locked(). -static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void pick_callback_done_locked(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; @@ -1132,23 +1087,22 @@ static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg, chand, calld); } GPR_ASSERT(calld->lb_policy != nullptr); - GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); calld->lb_policy = nullptr; - async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + async_pick_done_locked(elem, GRPC_ERROR_REF(error)); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). // If the pick was completed synchronously, unrefs the LB policy and // returns true. -static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static bool pick_callback_start_locked(grpc_call_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy); } - apply_service_config_to_call_locked(exec_ctx, elem); + apply_service_config_to_call_locked(elem); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -1178,7 +1132,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, chand->lb_policy, &inputs, &calld->connected_subchannel, + chand->lb_policy, &inputs, &calld->connected_subchannel, calld->subchannel_call_context, nullptr, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ @@ -1186,12 +1140,12 @@ static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); } - GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); calld->lb_policy = nullptr; } else { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( - exec_ctx, calld->call_combiner, + calld->call_combiner, GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure, pick_callback_cancel_locked, elem, grpc_combiner_scheduler(chand->combiner))); @@ -1208,8 +1162,7 @@ typedef struct { // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx, - void* arg, +static void pick_after_resolver_result_cancel_locked(void* arg, grpc_error* error) { pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { @@ -1237,16 +1190,13 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx, // it's safe to call async_pick_done_locked() here -- we are // essentially calling it here instead of calling it in // pick_after_resolver_result_done_locked(). - async_pick_done_locked(exec_ctx, elem, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } -static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem); +static void pick_after_resolver_result_start_locked(grpc_call_element* elem); -static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx, - void* arg, +static void pick_after_resolver_result_done_locked(void* arg, grpc_error* error) { pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { @@ -1266,19 +1216,19 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", chand, calld); } - async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + async_pick_done_locked(elem, GRPC_ERROR_REF(error)); } else if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); } - if (pick_callback_start_locked(exec_ctx, elem)) { + if (pick_callback_start_locked(elem)) { // Even if the LB policy returns a result synchronously, we have // already added our polling entity to chand->interested_parties // in order to wait for the resolver result, so we need to // remove it here. Therefore, we call async_pick_done_locked() // instead of pick_done_locked(). - async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); + async_pick_done_locked(elem, GRPC_ERROR_NONE); } } // TODO(roth): It should be impossible for chand->lb_policy to be NULL @@ -1296,19 +1246,18 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx, "trying again", chand, calld); } - pick_after_resolver_result_start_locked(exec_ctx, elem); + pick_after_resolver_result_start_locked(elem); } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, calld); } async_pick_done_locked( - exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } } -static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; if (grpc_client_channel_trace.enabled()) { @@ -1324,47 +1273,46 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, &args->closure, GRPC_ERROR_NONE); grpc_call_combiner_set_notify_on_cancel( - exec_ctx, calld->call_combiner, + calld->call_combiner, GRPC_CLOSURE_INIT(&args->cancel_closure, pick_after_resolver_result_cancel_locked, args, grpc_combiner_scheduler(chand->combiner))); } -static void start_pick_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* ignored) { +static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(calld->connected_subchannel == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. - if (pick_callback_start_locked(exec_ctx, elem)) { + if (pick_callback_start_locked(elem)) { // Pick completed synchronously. - pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); + pick_done_locked(elem, GRPC_ERROR_NONE); return; } } else { // We do not yet have an LB policy, so wait for a resolver result. if (chand->resolver == nullptr) { - pick_done_locked(exec_ctx, elem, + pick_done_locked(elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); return; } if (!chand->started_resolving) { - start_resolving_locked(exec_ctx, chand); + start_resolving_locked(chand); } - pick_after_resolver_result_start_locked(exec_ctx, elem); + pick_after_resolver_result_start_locked(elem); } // We need to wait for either a resolver result or for an async result // from the LB policy. Add the polling entity from call_data to the // channel_data's interested_parties, so that the I/O of the LB policy // and resolver can be done under it. The polling entity will be // removed in async_pick_done_locked(). - grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + grpc_polling_entity_add_to_pollset_set(calld->pollent, chand->interested_parties); } -static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (calld->retry_throttle_data != nullptr) { @@ -1380,18 +1328,15 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { calld->retry_throttle_data); } } - GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete, - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(calld->original_on_complete, GRPC_ERROR_REF(error)); } static void cc_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { - grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - batch); + grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); // If we've previously been cancelled, immediately fail any new batches. @@ -1401,7 +1346,7 @@ static void cc_start_transport_stream_op_batch( chand, calld, grpc_error_string(calld->error)); } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner); + batch, GRPC_ERROR_REF(calld->error), calld->call_combiner); goto done; } if (batch->cancel_stream) { @@ -1419,11 +1364,10 @@ static void cc_start_transport_stream_op_batch( // If we have a subchannel call, send the cancellation batch down. // Otherwise, fail all pending batches. if (calld->subchannel_call != nullptr) { - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch); + grpc_subchannel_call_process_op(calld->subchannel_call, batch); } else { waiting_for_pick_batches_add(calld, batch); - waiting_for_pick_batches_fail(exec_ctx, elem, - GRPC_ERROR_REF(calld->error)); + waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error)); } goto done; } @@ -1446,7 +1390,7 @@ static void cc_start_transport_stream_op_batch( "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, calld, calld->subchannel_call); } - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch); + grpc_subchannel_call_process_op(calld->subchannel_call, batch); goto done; } // We do not yet have a subchannel call. @@ -1460,7 +1404,6 @@ static void cc_start_transport_stream_op_batch( chand, calld); } GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, elem, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); @@ -1471,7 +1414,7 @@ static void cc_start_transport_stream_op_batch( "chand=%p calld=%p: saved batch, yeilding call combiner", chand, calld); } - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "batch does not include send_initial_metadata"); } done: @@ -1479,8 +1422,7 @@ done: } /* Constructor for call_data */ -static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* cc_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; @@ -1492,23 +1434,22 @@ static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx, calld->owning_call = args->call_stack; calld->call_combiner = args->call_combiner; if (chand->deadline_checking_enabled) { - grpc_deadline_state_init(exec_ctx, elem, args->call_stack, - args->call_combiner, calld->deadline); + grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, + calld->deadline); } return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void cc_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { - grpc_deadline_state_destroy(exec_ctx, elem); + grpc_deadline_state_destroy(elem); } - grpc_slice_unref_internal(exec_ctx, calld->path); + grpc_slice_unref_internal(calld->path); if (calld->method_params != nullptr) { method_parameters_unref(calld->method_params); } @@ -1517,14 +1458,13 @@ static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, then_schedule_closure); then_schedule_closure = nullptr; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call, + GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } GPR_ASSERT(calld->lb_policy == nullptr); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, - "picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked"); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (calld->subchannel_call_context[i].value != nullptr) { @@ -1532,11 +1472,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx, calld->subchannel_call_context[i].value); } } - GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } -static void cc_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = (call_data*)elem->call_data; calld->pollent = pollent; @@ -1560,29 +1499,27 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error_ignored) { +static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = (channel_data*)arg; if (chand->lb_policy != nullptr) { - grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy); + grpc_lb_policy_exit_idle_locked(chand->lb_policy); } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != nullptr) { - start_resolving_locked(exec_ctx, chand); + start_resolving_locked(chand); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, int try_to_connect) { + grpc_channel_element* elem, int try_to_connect) { channel_data* chand = (channel_data*)elem->channel_data; grpc_connectivity_state out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); @@ -1663,50 +1600,49 @@ int grpc_client_channel_num_external_connectivity_watchers( return count; } -static void on_external_watch_complete_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void on_external_watch_complete_locked(void* arg, grpc_error* error) { external_connectivity_watcher* w = (external_connectivity_watcher*)arg; grpc_closure* follow_up = w->on_complete; - grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); } -static void watch_connectivity_state_locked(grpc_exec_ctx* exec_ctx, void* arg, +static void watch_connectivity_state_locked(void* arg, grpc_error* error_ignored) { external_connectivity_watcher* w = (external_connectivity_watcher*)arg; external_connectivity_watcher* found = nullptr; if (w->state != nullptr) { external_connectivity_watcher_list_append(w->chand, w); - GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, + w->state, &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); grpc_connectivity_state_notify_on_state_change( - exec_ctx, &found->chand->state_tracker, nullptr, &found->my_closure); + &found->chand->state_tracker, nullptr, &found->my_closure); } - grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); } } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, - grpc_polling_entity pollent, grpc_connectivity_state* state, - grpc_closure* closure, grpc_closure* watcher_timer_init) { + grpc_channel_element* elem, grpc_polling_entity pollent, + grpc_connectivity_state* state, grpc_closure* closure, + grpc_closure* watcher_timer_init) { channel_data* chand = (channel_data*)elem->channel_data; external_connectivity_watcher* w = (external_connectivity_watcher*)gpr_zalloc(sizeof(*w)); @@ -1715,12 +1651,11 @@ void grpc_client_channel_watch_connectivity_state( w->on_complete = closure; w->state = state; w->watcher_timer_init = watcher_timer_init; - grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent, + grpc_polling_entity_add_to_pollset_set(&w->pollent, chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); |