diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 245 |
1 files changed, 108 insertions, 137 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index ce061c9782..f07394d29b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -143,8 +143,7 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* subchannel, - grpc_error* error); +static void subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -161,10 +160,9 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* subchannel, * connection implementation */ -static void connection_destroy(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void connection_destroy(void* arg, grpc_error* error) { grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; - grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } @@ -174,26 +172,23 @@ grpc_connected_subchannel* grpc_connected_subchannel_ref( return c; } -void grpc_connected_subchannel_unref(grpc_exec_ctx* exec_ctx, - grpc_connected_subchannel* c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), - REF_REASON); +void grpc_connected_subchannel_unref( + grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } /* * grpc_subchannel implementation */ -static void subchannel_destroy(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; gpr_free((void*)c->filters); - grpc_channel_args_destroy(exec_ctx, c->args); - grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); - grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(exec_ctx, c->pollset_set); - grpc_subchannel_key_destroy(exec_ctx, c->key); + grpc_channel_args_destroy(c->args); + grpc_connectivity_state_destroy(&c->state_tracker); + grpc_connector_unref(c->connector); + grpc_pollset_set_destroy(c->pollset_set); + grpc_subchannel_key_destroy(c->key); gpr_mu_destroy(&c->mu); gpr_free(c); } @@ -245,43 +240,39 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } } -static void disconnect(grpc_exec_ctx* exec_ctx, grpc_subchannel* c) { +static void disconnect(grpc_subchannel* c) { grpc_connected_subchannel* con; - grpc_subchannel_index_unregister(exec_ctx, c->key, c); + grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; - grpc_connector_shutdown( - exec_ctx, c->connector, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Subchannel disconnected")); + grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Subchannel disconnected")); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); } gpr_mu_unlock(&c->mu); } -void grpc_subchannel_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; // add a weak ref and subtract a strong ref (atomically) old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { - disconnect(exec_ctx, c); + disconnect(c); } - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref"); } -void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel* c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_weak_unref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -335,17 +326,16 @@ static void parse_args_for_backoff_values( .set_max_backoff(max_backoff_ms); } -grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, - grpc_connector* connector, +grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); - grpc_subchannel* c = grpc_subchannel_index_find(exec_ctx, key); + grpc_subchannel* c = grpc_subchannel_index_find(key); if (c) { - grpc_subchannel_key_destroy(exec_ctx, key); + grpc_subchannel_key_destroy(key); return c; } - GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); + GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); @@ -363,10 +353,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address* addr = (grpc_resolved_address*)gpr_malloc(sizeof(*addr)); - grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); + grpc_get_subchannel_address_arg(args->args, addr); grpc_resolved_address* new_address = nullptr; grpc_channel_args* new_args = nullptr; - if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address, + if (grpc_proxy_mappers_map_address(addr, args->args, &new_address, &new_args)) { GPR_ASSERT(new_address != nullptr); gpr_free(addr); @@ -379,7 +369,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, new_args != nullptr ? new_args : args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); gpr_free(new_arg.value.string); - if (new_args != nullptr) grpc_channel_args_destroy(exec_ctx, new_args); + if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, @@ -392,21 +382,19 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); - return grpc_subchannel_index_register(exec_ctx, key, c); + return grpc_subchannel_index_register(key, c); } -static void continue_connect_locked(grpc_exec_ctx* exec_ctx, - grpc_subchannel* c) { +static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; const grpc_millis min_deadline = - c->min_connect_timeout_ms + grpc_exec_ctx_now(exec_ctx); + c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, - "state_change"); - grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "state_change"); + grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected); } @@ -419,24 +407,23 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, return state; } -static void on_external_state_watcher_done(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_external_state_watcher_done(void* arg, grpc_error* error) { external_state_watcher* w = (external_state_watcher*)arg; grpc_closure* follow_up = w->notify; if (w->pollset_set != nullptr) { - grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); w->next->prev = w->prev; w->prev->next = w->next; gpr_mu_unlock(&w->subchannel->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); - GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); } -static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void on_alarm(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; gpr_mu_lock(&c->mu); c->have_alarm = false; @@ -448,18 +435,17 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->next_attempt_deadline = c->backoff->Step(exec_ctx); - continue_connect_locked(exec_ctx, c); + c->next_attempt_deadline = c->backoff->Step(); + continue_connect_locked(c); gpr_mu_unlock(&c->mu); } else { gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } GRPC_ERROR_UNREF(error); } -static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, - grpc_subchannel* c) { +static void maybe_start_connecting_locked(grpc_subchannel* c) { if (c->disconnected) { /* Don't try to connect if we're already disconnected */ return; @@ -485,28 +471,26 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->next_attempt_deadline = c->backoff->Begin(exec_ctx); - continue_connect_locked(exec_ctx, c); + c->next_attempt_deadline = c->backoff->Begin(); + continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->next_attempt_deadline - grpc_exec_ctx_now(exec_ctx); + c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt_deadline, - &c->on_alarm); + grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } void grpc_subchannel_notify_on_state_change( - grpc_exec_ctx* exec_ctx, grpc_subchannel* c, - grpc_pollset_set* interested_parties, grpc_connectivity_state* state, - grpc_closure* notify) { + grpc_subchannel* c, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* notify) { external_state_watcher* w; if (state == nullptr) { @@ -514,8 +498,8 @@ void grpc_subchannel_notify_on_state_change( for (w = c->root_external_state_watcher.next; w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, nullptr, &w->closure); + grpc_connectivity_state_notify_on_state_change(&c->state_tracker, + nullptr, &w->closure); } } gpr_mu_unlock(&c->mu); @@ -527,31 +511,28 @@ void grpc_subchannel_notify_on_state_change( GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, grpc_schedule_on_exec_ctx); if (interested_parties != nullptr) { - grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, - interested_parties); + grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, - state, &w->closure); - maybe_start_connecting_locked(exec_ctx, c); + grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, + &w->closure); + maybe_start_connecting_locked(c); gpr_mu_unlock(&c->mu); } } void grpc_connected_subchannel_process_transport_op( - grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, - grpc_transport_op* op) { + grpc_connected_subchannel* con, grpc_transport_op* op) { grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(exec_ctx, top_elem, op); + top_elem->filter->start_transport_op(top_elem, op); } -static void subchannel_on_child_state_changed(grpc_exec_ctx* exec_ctx, void* p, - grpc_error* error) { +static void subchannel_on_child_state_changed(void* p, grpc_error* error) { state_watcher* sw = (state_watcher*)p; grpc_subchannel* c = sw->subchannel; gpr_mu* mu = &c->mu; @@ -563,24 +544,22 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx* exec_ctx, void* p, /* any errors on a subchannel ==> we're done, create a new one */ sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; } - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - sw->connectivity_state, GRPC_ERROR_REF(error), - "reflect_child"); + grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, + GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, &sw->connectivity_state, &sw->closure); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); sw = nullptr; } gpr_mu_unlock(mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher"); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); gpr_free(sw); } -static void connected_subchannel_state_op(grpc_exec_ctx* exec_ctx, - grpc_connected_subchannel* con, +static void connected_subchannel_state_op(grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure) { @@ -590,29 +569,27 @@ static void connected_subchannel_state_op(grpc_exec_ctx* exec_ctx, op->on_connectivity_state_change = closure; op->bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(elem, op); } void grpc_connected_subchannel_notify_on_state_change( - grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, - grpc_pollset_set* interested_parties, grpc_connectivity_state* state, - grpc_closure* closure) { - connected_subchannel_state_op(exec_ctx, con, interested_parties, state, - closure); + grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* closure) { + connected_subchannel_state_op(con, interested_parties, state, closure); } -void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, - grpc_connected_subchannel* con, - grpc_closure* closure) { +void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, + grpc_closure* on_initiate, + grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; - op->send_ping = closure; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(elem, op); } -static bool publish_transport_locked(grpc_exec_ctx* exec_ctx, - grpc_subchannel* c) { +static bool publish_transport_locked(grpc_subchannel* c) { grpc_connected_subchannel* con; grpc_channel_stack* stk; state_watcher* sw_subchannel; @@ -620,19 +597,18 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx, /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( - exec_ctx, builder, c->connecting_result.channel_args); + builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, c->connecting_result.transport); - if (!grpc_channel_init_create_stack(exec_ctx, builder, - GRPC_CLIENT_SUBCHANNEL)) { - grpc_channel_stack_builder_destroy(exec_ctx, builder); + if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { + grpc_channel_stack_builder_destroy(builder); return false; } grpc_error* error = grpc_channel_stack_builder_finish( - exec_ctx, builder, 0, 1, connection_destroy, nullptr, (void**)&con); + builder, 0, 1, connection_destroy, nullptr, (void**)&con); if (error != GRPC_ERROR_NONE) { - grpc_transport_destroy(exec_ctx, c->connecting_result.transport); + grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", grpc_error_string(error)); GRPC_ERROR_UNREF(error); @@ -650,7 +626,7 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx, if (c->disconnected) { gpr_free(sw_subchannel); - grpc_channel_stack_destroy(exec_ctx, stk); + grpc_channel_stack_destroy(stk); gpr_free(con); return false; } @@ -666,19 +642,18 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx, /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, + con, c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connected"); return true; } -static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; @@ -686,13 +661,13 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg, gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && - publish_transport_locked(exec_ctx, c)) { + publish_transport_locked(c)) { /* do nothing, transport was published */ } else if (c->disconnected) { - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } else { grpc_connectivity_state_set( - exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), @@ -701,27 +676,26 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg, const char* errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - maybe_start_connecting_locked(exec_ctx, c); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + maybe_start_connecting_locked(c); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); - grpc_channel_args_destroy(exec_ctx, delete_channel_args); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected"); + grpc_channel_args_destroy(delete_channel_args); } /* * grpc_subchannel_call implementation */ -static void subchannel_call_destroy(grpc_exec_ctx* exec_ctx, void* call, - grpc_error* error) { +static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel* connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -737,20 +711,18 @@ void grpc_subchannel_call_ref( GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } -void grpc_subchannel_call_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel_call* c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); +void grpc_subchannel_call_unref( + grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } -void grpc_subchannel_call_process_op(grpc_exec_ctx* exec_ctx, - grpc_subchannel_call* call, +void grpc_subchannel_call_process_op(grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0); grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); - top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, batch); + top_elem->filter->start_transport_stream_op_batch(top_elem, batch); GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } @@ -765,7 +737,7 @@ const grpc_subchannel_key* grpc_subchannel_get_key( } grpc_error* grpc_connected_subchannel_create_call( - grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, + grpc_connected_subchannel* con, const grpc_connected_subchannel_call_args* args, grpc_subchannel_call** call) { grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); @@ -783,14 +755,14 @@ grpc_error* grpc_connected_subchannel_create_call( args->arena, /* arena */ args->call_combiner /* call_combiner */ }; - grpc_error* error = grpc_call_stack_init( - exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); + grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, + *call, &call_args); if (error != GRPC_ERROR_NONE) { const char* error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); return error; } - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); + grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); return GRPC_ERROR_NONE; } @@ -799,21 +771,20 @@ grpc_call_stack* grpc_subchannel_call_get_call_stack( return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } -static void grpc_uri_to_sockaddr(grpc_exec_ctx* exec_ctx, const char* uri_str, +static void grpc_uri_to_sockaddr(const char* uri_str, grpc_resolved_address* addr) { - grpc_uri* uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); + grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != nullptr); if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); grpc_uri_destroy(uri); } -void grpc_get_subchannel_address_arg(grpc_exec_ctx* exec_ctx, - const grpc_channel_args* args, +void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr) { const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { - grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr); + grpc_uri_to_sockaddr(addr_uri_str, addr); } } |