aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc409
1 files changed, 172 insertions, 237 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index aced9adf9f..ba82c88eb7 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -91,8 +91,7 @@ static void method_parameters_unref(method_parameters* method_params) {
static void* method_parameters_ref_wrapper(void* value) {
return method_parameters_ref((method_parameters*)value);
}
-static void method_parameters_unref_wrapper(grpc_exec_ctx* exec_ctx,
- void* value) {
+static void method_parameters_unref_wrapper(void* value) {
method_parameters_unref((method_parameters*)value);
}
@@ -228,12 +227,11 @@ typedef struct {
grpc_lb_policy* lb_policy;
} lb_policy_connectivity_watcher;
-static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand,
+static void watch_lb_policy_locked(channel_data* chand,
grpc_lb_policy* lb_policy,
grpc_connectivity_state current_state);
-static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx,
- channel_data* chand,
+static void set_channel_connectivity_state_locked(channel_data* chand,
grpc_connectivity_state state,
grpc_error* error,
const char* reason) {
@@ -245,12 +243,12 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx,
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
- exec_ctx, chand->lb_policy,
+ chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
} else if (state == GRPC_CHANNEL_SHUTDOWN) {
/* cancel all picks */
- grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
+ grpc_lb_policy_cancel_picks_locked(chand->lb_policy,
/* mask= */ 0, /* check= */ 0,
GRPC_ERROR_REF(error));
}
@@ -259,12 +257,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
grpc_connectivity_state_name(state));
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
- reason);
+ grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
}
-static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
@@ -272,17 +268,17 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
}
- set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state,
+ set_channel_connectivity_state_locked(w->chand, w->state,
GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
- watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
+ watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
}
}
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
+ GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
-static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand,
+static void watch_lb_policy_locked(channel_data* chand,
grpc_lb_policy* lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher* w =
@@ -293,19 +289,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand,
grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
+ grpc_lb_policy_notify_on_state_change_locked(lb_policy, &w->state,
&w->on_changed);
}
-static void start_resolving_locked(grpc_exec_ctx* exec_ctx,
- channel_data* chand) {
+static void start_resolving_locked(channel_data* chand) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
}
GPR_ASSERT(!chand->started_resolving);
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
+ grpc_resolver_next_locked(chand->resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
@@ -369,29 +364,26 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) {
}
}
-static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void request_reresolution_locked(void* arg, grpc_error* error) {
reresolution_request_args* args = (reresolution_request_args*)arg;
channel_data* chand = args->chand;
// If this invocation is for a stale LB policy, treat it as an LB shutdown
// signal.
if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution");
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
gpr_free(args);
return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
}
- grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver);
+ grpc_resolver_channel_saw_error_locked(chand->resolver);
// Give back the closure to the LB policy.
- grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy,
- &args->closure);
+ grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure);
}
-static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = (channel_data*)arg;
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
@@ -458,12 +450,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
lb_policy_updated = true;
- grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy,
- &lb_policy_args);
+ grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args);
} else {
// Instantiate new LB policy.
- new_lb_policy =
- grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args);
if (new_lb_policy == nullptr) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
@@ -475,7 +465,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
grpc_combiner_scheduler(chand->combiner));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
- grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy,
+ grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy,
&args->closure);
}
}
@@ -492,8 +482,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
GRPC_ARG_SERVER_URI);
GPR_ASSERT(channel_arg != nullptr);
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_uri* uri =
- grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
+ grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
@@ -504,7 +493,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
grpc_uri_destroy(uri);
retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
- exec_ctx, service_config, method_parameters_create_from_json,
+ service_config, method_parameters_create_from_json,
method_parameters_ref_wrapper, method_parameters_unref_wrapper);
grpc_service_config_destroy(service_config);
}
@@ -514,7 +503,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
// The copy will be saved in chand->lb_policy_name below.
lb_policy_name_dup = gpr_strdup(lb_policy_name);
}
- grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
+ grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
}
if (grpc_client_channel_trace.enabled()) {
@@ -546,7 +535,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
chand->retry_throttle_data = retry_throttle_data;
// Swap out the method params table.
if (chand->method_params_table != nullptr) {
- grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
+ grpc_slice_hash_table_unref(chand->method_params_table);
}
chand->method_params_table = method_params_table;
// If we have a new LB policy or are shutting down (in which case
@@ -562,10 +551,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
chand->lb_policy);
}
- grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
chand->lb_policy = new_lb_policy;
}
@@ -579,21 +567,20 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
}
- grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ grpc_resolver_shutdown_locked(chand->resolver);
+ GRPC_RESOLVER_UNREF(chand->resolver, "channel");
chand->resolver = nullptr;
}
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Got resolver result after disconnection", &error, 1),
"resolver_gone");
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Channel disconnected", &error, 1));
- GRPC_CLOSURE_LIST_SCHED(exec_ctx,
- &chand->waiting_for_resolver_result_closures);
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
@@ -603,33 +590,28 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
}
GRPC_ERROR_UNREF(state_error);
- state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
- &state_error);
- grpc_pollset_set_add_pollset_set(exec_ctx,
- new_lb_policy->interested_parties,
+ state =
+ grpc_lb_policy_check_connectivity_locked(new_lb_policy, &state_error);
+ grpc_pollset_set_add_pollset_set(new_lb_policy->interested_parties,
chand->interested_parties);
- GRPC_CLOSURE_LIST_SCHED(exec_ctx,
- &chand->waiting_for_resolver_result_closures);
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
if (chand->exit_idle_when_lb_policy_arrives) {
- grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
+ grpc_lb_policy_exit_idle_locked(new_lb_policy);
chand->exit_idle_when_lb_policy_arrives = false;
}
- watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
+ watch_lb_policy_locked(chand, new_lb_policy, state);
}
if (!lb_policy_updated) {
- set_channel_connectivity_state_locked(exec_ctx, chand, state,
- GRPC_ERROR_REF(state_error),
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(
+ chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
}
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
+ grpc_resolver_next_locked(chand->resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
GRPC_ERROR_UNREF(state_error);
}
}
-static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error_ignored) {
+static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
grpc_transport_op* op = (grpc_transport_op*)arg;
grpc_channel_element* elem =
(grpc_channel_element*)op->handler_private.extra_arg;
@@ -637,7 +619,7 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (op->on_connectivity_state_change != nullptr) {
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, op->connectivity_state,
+ &chand->state_tracker, op->connectivity_state,
op->on_connectivity_state_change);
op->on_connectivity_state_change = nullptr;
op->connectivity_state = nullptr;
@@ -645,11 +627,10 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (op->send_ping != nullptr) {
if (chand->lb_policy == nullptr) {
- GRPC_CLOSURE_SCHED(
- exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
+ GRPC_CLOSURE_SCHED(op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Ping with no load balancing"));
} else {
- grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
+ grpc_lb_policy_ping_one_locked(chand->lb_policy, op->send_ping);
op->bind_pollset = nullptr;
}
op->send_ping = nullptr;
@@ -658,54 +639,48 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
if (chand->resolver != nullptr) {
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
- grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ grpc_resolver_shutdown_locked(chand->resolver);
+ GRPC_RESOLVER_UNREF(chand->resolver, "channel");
chand->resolver = nullptr;
if (!chand->started_resolving) {
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
- GRPC_CLOSURE_LIST_SCHED(exec_ctx,
- &chand->waiting_for_resolver_result_closures);
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = nullptr;
}
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
-static void cc_start_transport_op(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static void cc_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) {
channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != nullptr) {
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
- op->bind_pollset);
+ grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
}
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
op, grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
-static void cc_get_channel_info(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static void cc_get_channel_info(grpc_channel_element* elem,
const grpc_channel_info* info) {
channel_data* chand = (channel_data*)elem->channel_data;
gpr_mu_lock(&chand->info_mu);
@@ -724,8 +699,7 @@ static void cc_get_channel_info(grpc_exec_ctx* exec_ctx,
}
/* Constructor for channel_data */
-static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(args->is_last);
@@ -746,7 +720,7 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx,
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
- grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties);
+ grpc_client_channel_start_backup_polling(chand->interested_parties);
// Record client channel factory.
const grpc_arg* arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
@@ -774,15 +748,15 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx,
}
char* proxy_name = nullptr;
grpc_channel_args* new_args = nullptr;
- grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
+ grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
&proxy_name, &new_args);
// Instantiate resolver.
chand->resolver = grpc_resolver_create(
- exec_ctx, proxy_name != nullptr ? proxy_name : arg->value.string,
+ proxy_name != nullptr ? proxy_name : arg->value.string,
new_args != nullptr ? new_args : args->channel_args,
chand->interested_parties, chand->combiner);
if (proxy_name != nullptr) gpr_free(proxy_name);
- if (new_args != nullptr) grpc_channel_args_destroy(exec_ctx, new_args);
+ if (new_args != nullptr) grpc_channel_args_destroy(new_args);
if (chand->resolver == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
}
@@ -791,32 +765,28 @@ static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx,
return GRPC_ERROR_NONE;
}
-static void shutdown_resolver_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void shutdown_resolver_locked(void* arg, grpc_error* error) {
grpc_resolver* resolver = (grpc_resolver*)arg;
- grpc_resolver_shutdown_locked(exec_ctx, resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
+ grpc_resolver_shutdown_locked(resolver);
+ GRPC_RESOLVER_UNREF(resolver, "channel");
}
/* Destructor for channel_data */
-static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {
+static void cc_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
if (chand->resolver != nullptr) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != nullptr) {
- grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ grpc_client_channel_factory_unref(chand->client_channel_factory);
}
if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
gpr_free(chand->info_lb_policy_name);
gpr_free(chand->info_service_config_json);
@@ -824,12 +794,12 @@ static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
}
if (chand->method_params_table != nullptr) {
- grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
+ grpc_slice_hash_table_unref(chand->method_params_table);
}
- grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties);
- grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
- GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
+ grpc_client_channel_stop_backup_polling(chand->interested_parties);
+ grpc_connectivity_state_destroy(&chand->state_tracker);
+ grpc_pollset_set_destroy(chand->interested_parties);
+ GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
}
@@ -916,21 +886,18 @@ static void waiting_for_pick_batches_add(
}
// This is called via the call combiner, so access to calld is synchronized.
-static void fail_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
call_data* calld = (call_data*)arg;
if (calld->waiting_for_pick_batches_count > 0) {
--calld->waiting_for_pick_batches_count;
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx,
calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count],
GRPC_ERROR_REF(error), calld->call_combiner);
}
}
// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static void waiting_for_pick_batches_fail(grpc_call_element* elem,
grpc_error* error) {
call_data* calld = (call_data*)elem->call_data;
if (grpc_client_channel_trace.enabled()) {
@@ -943,37 +910,34 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
fail_pending_batch_in_call_combiner, calld,
grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- &calld->handle_pending_batch_in_call_combiner[i],
- GRPC_ERROR_REF(error),
- "waiting_for_pick_batches_fail");
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i],
+ GRPC_ERROR_REF(error), "waiting_for_pick_batches_fail");
}
if (calld->initial_metadata_batch != nullptr) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error),
+ calld->initial_metadata_batch, GRPC_ERROR_REF(error),
calld->call_combiner);
} else {
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"waiting_for_pick_batches_fail");
}
GRPC_ERROR_UNREF(error);
}
// This is called via the call combiner, so access to calld is synchronized.
-static void run_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* ignored) {
+static void run_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) {
call_data* calld = (call_data*)arg;
if (calld->waiting_for_pick_batches_count > 0) {
--calld->waiting_for_pick_batches_count;
grpc_subchannel_call_process_op(
- exec_ctx, calld->subchannel_call,
+ calld->subchannel_call,
calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]);
}
}
// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
+static void waiting_for_pick_batches_resume(grpc_call_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
if (grpc_client_channel_trace.enabled()) {
@@ -987,20 +951,18 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
run_pending_batch_in_call_combiner, calld,
grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- &calld->handle_pending_batch_in_call_combiner[i],
- GRPC_ERROR_NONE,
- "waiting_for_pick_batches_resume");
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i],
+ GRPC_ERROR_NONE, "waiting_for_pick_batches_resume");
}
GPR_ASSERT(calld->initial_metadata_batch != nullptr);
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
+ grpc_subchannel_call_process_op(calld->subchannel_call,
calld->initial_metadata_batch);
}
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
-static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
+static void apply_service_config_to_call_locked(grpc_call_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
if (grpc_client_channel_trace.enabled()) {
@@ -1013,7 +975,7 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx,
}
if (chand->method_params_table != nullptr) {
calld->method_params = (method_parameters*)grpc_method_config_table_get(
- exec_ctx, chand->method_params_table, calld->path);
+ chand->method_params_table, calld->path);
if (calld->method_params != nullptr) {
method_parameters_ref(calld->method_params);
// If the deadline from the service config is shorter than the one
@@ -1025,15 +987,14 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx,
calld->method_params->timeout;
if (per_method_deadline < calld->deadline) {
calld->deadline = per_method_deadline;
- grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
+ grpc_deadline_state_reset(elem, calld->deadline);
}
}
}
}
}
-static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
@@ -1047,24 +1008,22 @@ static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx,
calld->call_combiner // call_combiner
};
grpc_error* new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, &call_args,
- &calld->subchannel_call);
+ calld->connected_subchannel, &call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
}
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail(exec_ctx, elem, new_error);
+ waiting_for_pick_batches_fail(elem, new_error);
} else {
- waiting_for_pick_batches_resume(exec_ctx, elem);
+ waiting_for_pick_batches_resume(elem);
}
GRPC_ERROR_UNREF(error);
}
// Invoked when a pick is completed, on both success or failure.
-static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_error* error) {
+static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
if (calld->connected_subchannel == nullptr) {
@@ -1080,10 +1039,10 @@ static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
"chand=%p calld=%p: failed to create subchannel: error=%s", chand,
calld, grpc_error_string(calld->error));
}
- waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error));
+ waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error));
} else {
/* Create call on subchannel. */
- create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ create_subchannel_call_locked(elem, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -1092,19 +1051,17 @@ static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
// either (a) the pick was deferred pending a resolver result or (b) the
// pick was done asynchronously. Removes the call's polling entity from
// chand->interested_parties before invoking pick_done_locked().
-static void async_pick_done_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem, grpc_error* error) {
+static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
+ grpc_polling_entity_del_from_pollset_set(calld->pollent,
chand->interested_parties);
- pick_done_locked(exec_ctx, elem, error);
+ pick_done_locked(elem, error);
}
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
-static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
@@ -1113,17 +1070,15 @@ static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg,
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
chand, calld, calld->lb_policy);
}
- grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
- &calld->connected_subchannel,
- GRPC_ERROR_REF(error));
+ grpc_lb_policy_cancel_pick_locked(
+ calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error));
}
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel");
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
}
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy and invokes async_pick_done_locked().
-static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void pick_callback_done_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
@@ -1132,23 +1087,22 @@ static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg,
chand, calld);
}
GPR_ASSERT(calld->lb_policy != nullptr);
- GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
calld->lb_policy = nullptr;
- async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ async_pick_done_locked(elem, GRPC_ERROR_REF(error));
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
// If the pick was completed synchronously, unrefs the LB policy and
// returns true.
-static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
+static bool pick_callback_start_locked(grpc_call_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
chand, calld, chand->lb_policy);
}
- apply_service_config_to_call_locked(exec_ctx, elem);
+ apply_service_config_to_call_locked(elem);
// If the application explicitly set wait_for_ready, use that.
// Otherwise, if the service config specified a value for this
// method, use that.
@@ -1178,7 +1132,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked(
- exec_ctx, chand->lb_policy, &inputs, &calld->connected_subchannel,
+ chand->lb_policy, &inputs, &calld->connected_subchannel,
calld->subchannel_call_context, nullptr, &calld->lb_pick_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
@@ -1186,12 +1140,12 @@ static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
calld->lb_policy = nullptr;
} else {
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
grpc_call_combiner_set_notify_on_cancel(
- exec_ctx, calld->call_combiner,
+ calld->call_combiner,
GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure,
pick_callback_cancel_locked, elem,
grpc_combiner_scheduler(chand->combiner)));
@@ -1208,8 +1162,7 @@ typedef struct {
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
-static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx,
- void* arg,
+static void pick_after_resolver_result_cancel_locked(void* arg,
grpc_error* error) {
pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg;
if (args->finished) {
@@ -1237,16 +1190,13 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx,
// it's safe to call async_pick_done_locked() here -- we are
// essentially calling it here instead of calling it in
// pick_after_resolver_result_done_locked().
- async_pick_done_locked(exec_ctx, elem,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
+ async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
}
-static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem);
+static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
-static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx,
- void* arg,
+static void pick_after_resolver_result_done_locked(void* arg,
grpc_error* error) {
pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg;
if (args->finished) {
@@ -1266,19 +1216,19 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
- async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ async_pick_done_locked(elem, GRPC_ERROR_REF(error));
} else if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
}
- if (pick_callback_start_locked(exec_ctx, elem)) {
+ if (pick_callback_start_locked(elem)) {
// Even if the LB policy returns a result synchronously, we have
// already added our polling entity to chand->interested_parties
// in order to wait for the resolver result, so we need to
// remove it here. Therefore, we call async_pick_done_locked()
// instead of pick_done_locked().
- async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE);
+ async_pick_done_locked(elem, GRPC_ERROR_NONE);
}
}
// TODO(roth): It should be impossible for chand->lb_policy to be NULL
@@ -1296,19 +1246,18 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx,
"trying again",
chand, calld);
}
- pick_after_resolver_result_start_locked(exec_ctx, elem);
+ pick_after_resolver_result_start_locked(elem);
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
calld);
}
async_pick_done_locked(
- exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
}
}
-static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
+static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
if (grpc_client_channel_trace.enabled()) {
@@ -1324,47 +1273,46 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx,
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
&args->closure, GRPC_ERROR_NONE);
grpc_call_combiner_set_notify_on_cancel(
- exec_ctx, calld->call_combiner,
+ calld->call_combiner,
GRPC_CLOSURE_INIT(&args->cancel_closure,
pick_after_resolver_result_cancel_locked, args,
grpc_combiner_scheduler(chand->combiner)));
}
-static void start_pick_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* ignored) {
+static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = (grpc_call_element*)arg;
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(calld->connected_subchannel == nullptr);
if (chand->lb_policy != nullptr) {
// We already have an LB policy, so ask it for a pick.
- if (pick_callback_start_locked(exec_ctx, elem)) {
+ if (pick_callback_start_locked(elem)) {
// Pick completed synchronously.
- pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE);
+ pick_done_locked(elem, GRPC_ERROR_NONE);
return;
}
} else {
// We do not yet have an LB policy, so wait for a resolver result.
if (chand->resolver == nullptr) {
- pick_done_locked(exec_ctx, elem,
+ pick_done_locked(elem,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
return;
}
if (!chand->started_resolving) {
- start_resolving_locked(exec_ctx, chand);
+ start_resolving_locked(chand);
}
- pick_after_resolver_result_start_locked(exec_ctx, elem);
+ pick_after_resolver_result_start_locked(elem);
}
// We need to wait for either a resolver result or for an async result
// from the LB policy. Add the polling entity from call_data to the
// channel_data's interested_parties, so that the I/O of the LB policy
// and resolver can be done under it. The polling entity will be
// removed in async_pick_done_locked().
- grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ grpc_polling_entity_add_to_pollset_set(calld->pollent,
chand->interested_parties);
}
-static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
call_data* calld = (call_data*)elem->call_data;
if (calld->retry_throttle_data != nullptr) {
@@ -1380,18 +1328,15 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
calld->retry_throttle_data);
}
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
- GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(calld->original_on_complete, GRPC_ERROR_REF(error));
}
static void cc_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* batch) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
if (chand->deadline_checking_enabled) {
- grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- batch);
+ grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
}
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
// If we've previously been cancelled, immediately fail any new batches.
@@ -1401,7 +1346,7 @@ static void cc_start_transport_stream_op_batch(
chand, calld, grpc_error_string(calld->error));
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
+ batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
goto done;
}
if (batch->cancel_stream) {
@@ -1419,11 +1364,10 @@ static void cc_start_transport_stream_op_batch(
// If we have a subchannel call, send the cancellation batch down.
// Otherwise, fail all pending batches.
if (calld->subchannel_call != nullptr) {
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
+ grpc_subchannel_call_process_op(calld->subchannel_call, batch);
} else {
waiting_for_pick_batches_add(calld, batch);
- waiting_for_pick_batches_fail(exec_ctx, elem,
- GRPC_ERROR_REF(calld->error));
+ waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error));
}
goto done;
}
@@ -1446,7 +1390,7 @@ static void cc_start_transport_stream_op_batch(
"chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
calld, calld->subchannel_call);
}
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
+ grpc_subchannel_call_process_op(calld->subchannel_call, batch);
goto done;
}
// We do not yet have a subchannel call.
@@ -1460,7 +1404,6 @@ static void cc_start_transport_stream_op_batch(
chand, calld);
}
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
elem, grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
@@ -1471,7 +1414,7 @@ static void cc_start_transport_stream_op_batch(
"chand=%p calld=%p: saved batch, yeilding call combiner", chand,
calld);
}
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"batch does not include send_initial_metadata");
}
done:
@@ -1479,8 +1422,7 @@ done:
}
/* Constructor for call_data */
-static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static grpc_error* cc_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
@@ -1492,23 +1434,22 @@ static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx,
calld->owning_call = args->call_stack;
calld->call_combiner = args->call_combiner;
if (chand->deadline_checking_enabled) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
- args->call_combiner, calld->deadline);
+ grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
+ calld->deadline);
}
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
-static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static void cc_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
if (chand->deadline_checking_enabled) {
- grpc_deadline_state_destroy(exec_ctx, elem);
+ grpc_deadline_state_destroy(elem);
}
- grpc_slice_unref_internal(exec_ctx, calld->path);
+ grpc_slice_unref_internal(calld->path);
if (calld->method_params != nullptr) {
method_parameters_unref(calld->method_params);
}
@@ -1517,14 +1458,13 @@ static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx,
grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
then_schedule_closure);
then_schedule_closure = nullptr;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call,
+ GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
"client_channel_destroy_call");
}
GPR_ASSERT(calld->lb_policy == nullptr);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
- "picked");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked");
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->subchannel_call_context[i].value != nullptr) {
@@ -1532,11 +1472,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx,
calld->subchannel_call_context[i].value);
}
}
- GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static void cc_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
grpc_polling_entity* pollent) {
call_data* calld = (call_data*)elem->call_data;
calld->pollent = pollent;
@@ -1560,29 +1499,27 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-static void try_to_connect_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error_ignored) {
+static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
channel_data* chand = (channel_data*)arg;
if (chand->lb_policy != nullptr) {
- grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
+ grpc_lb_policy_exit_idle_locked(chand->lb_policy);
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != nullptr) {
- start_resolving_locked(exec_ctx, chand);
+ start_resolving_locked(chand);
}
}
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, int try_to_connect) {
+ grpc_channel_element* elem, int try_to_connect) {
channel_data* chand = (channel_data*)elem->channel_data;
grpc_connectivity_state out =
grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
@@ -1663,50 +1600,49 @@ int grpc_client_channel_num_external_connectivity_watchers(
return count;
}
-static void on_external_watch_complete_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
external_connectivity_watcher* w = (external_connectivity_watcher*)arg;
grpc_closure* follow_up = w->on_complete;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
"external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
- GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
}
-static void watch_connectivity_state_locked(grpc_exec_ctx* exec_ctx, void* arg,
+static void watch_connectivity_state_locked(void* arg,
grpc_error* error_ignored) {
external_connectivity_watcher* w = (external_connectivity_watcher*)arg;
external_connectivity_watcher* found = nullptr;
if (w->state != nullptr) {
external_connectivity_watcher_list_append(w->chand, w);
- GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
grpc_combiner_scheduler(w->chand->combiner));
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
+ w->state, &w->my_closure);
} else {
GPR_ASSERT(w->watcher_timer_init == nullptr);
found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
if (found) {
GPR_ASSERT(found->on_complete == w->on_complete);
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &found->chand->state_tracker, nullptr, &found->my_closure);
+ &found->chand->state_tracker, nullptr, &found->my_closure);
}
- grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
}
}
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx* exec_ctx, grpc_channel_element* elem,
- grpc_polling_entity pollent, grpc_connectivity_state* state,
- grpc_closure* closure, grpc_closure* watcher_timer_init) {
+ grpc_channel_element* elem, grpc_polling_entity pollent,
+ grpc_connectivity_state* state, grpc_closure* closure,
+ grpc_closure* watcher_timer_init) {
channel_data* chand = (channel_data*)elem->channel_data;
external_connectivity_watcher* w =
(external_connectivity_watcher*)gpr_zalloc(sizeof(*w));
@@ -1715,12 +1651,11 @@ void grpc_client_channel_watch_connectivity_state(
w->on_complete = closure;
w->state = state;
w->watcher_timer_init = watcher_timer_init;
- grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
+ grpc_polling_entity_add_to_pollset_set(&w->pollent,
chand->interested_parties);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);