diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 115 |
1 files changed, 57 insertions, 58 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 25615b6326..2f1662e63b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -56,8 +56,8 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_core::ConnectedSubchannel*)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) typedef struct { @@ -106,7 +106,8 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_connected_subchannel */ + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ gpr_atm connected_subchannel; /** mutex protecting remaining elements */ @@ -135,7 +136,7 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel* connection; + grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; }; @@ -166,14 +167,14 @@ static void connection_destroy(void* arg, grpc_error* error) { gpr_free(stk); } -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref( + grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { c->Ref(DEBUG_LOCATION, REF_REASON); return c; } -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void ConnectedSubchannel_unref( + grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { c->Unref(DEBUG_LOCATION, REF_REASON); } @@ -247,7 +248,7 @@ static void disconnect(grpc_subchannel* c) { c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - grpc_connected_subchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + grpc_core::ConnectedSubchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != nullptr) { GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); @@ -535,11 +536,15 @@ static void on_connected_subchannel_connectivity_changed(void* p, auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); /* if we failed just leave this closure */ - if (connected_subchannel_watcher->connectivity_state == + if (connected_subchannel_watcher->connectivity_state >= GRPC_CHANNEL_TRANSIENT_FAILURE) { if (!c->disconnected && con != nullptr) { GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr); + gpr_log( + GPR_INFO, + "LOL FORMER Connected subchannel %p of subchannel %p is now NULL.", + con, c); grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); @@ -547,28 +552,28 @@ static void on_connected_subchannel_connectivity_changed(void* p, c->backoff->Reset(); if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into " - "TRANSIENT_FAILURE. Attempting to reconnect.", - con, c); + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + con, c, grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); } maybe_start_connecting_locked(c); - goto done; } else { connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN; } + } else { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + if (connected_subchannel_watcher->connectivity_state < + GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + con->NotifyOnStateChange( + nullptr, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; + } } - grpc_connectivity_state_set(&c->state_tracker, - connected_subchannel_watcher->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (connected_subchannel_watcher->connectivity_state < - GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - con->NotifyOnStateChange(nullptr, - &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); - connected_subchannel_watcher = nullptr; - } -done: gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); gpr_free(connected_subchannel_watcher); @@ -619,8 +624,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { I'd have expected the rel_cas below to be enough, but seemingly it's not. Re-evaluate if we really need this. */ - grpc_connected_subchannel* con = - grpc_core::New<grpc_connected_subchannel>(stk); + grpc_core::ConnectedSubchannel* con = + grpc_core::New<grpc_core::ConnectedSubchannel>(stk); gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); @@ -677,7 +682,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel* connection = c->connection; + grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); @@ -711,7 +716,7 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( +grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel( grpc_subchannel* c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } @@ -757,24 +762,16 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } -grpc_connected_subchannel::grpc_connected_subchannel( - grpc_channel_stack* channel_stack) +namespace grpc_core { +ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), channel_stack_(channel_stack) {} -grpc_connected_subchannel* grpc_connected_subchannel::Ref( - const grpc_core::DebugLocation& location, const char* reason) { - GRPC_CHANNEL_STACK_REF(channel_stack_, REF_REASON); - grpc_core::RefCountedWithTracing::Ref(location, reason); - return this; -} -void grpc_connected_subchannel::Unref(const grpc_core::DebugLocation& location, - const char* reason) { - GRPC_CHANNEL_STACK_UNREF(channel_stack_, REF_REASON); - grpc_core::RefCountedWithTracing::Unref(location, reason); +ConnectedSubchannel::~ConnectedSubchannel() { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } -void grpc_connected_subchannel::NotifyOnStateChange( +void ConnectedSubchannel::NotifyOnStateChange( grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure) { grpc_transport_op* op = grpc_make_transport_op(nullptr); @@ -786,8 +783,8 @@ void grpc_connected_subchannel::NotifyOnStateChange( elem->filter->start_transport_op(elem, op); } -void grpc_connected_subchannel::Ping(grpc_closure* on_initiate, - grpc_closure* on_ack) { +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; op->send_ping.on_initiate = on_initiate; @@ -796,22 +793,23 @@ void grpc_connected_subchannel::Ping(grpc_closure* on_initiate, elem->filter->start_transport_op(elem, op); } -grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args, - grpc_subchannel_call** call) { +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, + args.arena, sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = Ref(DEBUG_LOCATION, "subchannel_call"); + Ref(DEBUG_LOCATION, "subchannel_call"); + (*call)->connection = this; const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ }; grpc_error* error = grpc_call_stack_init( channel_stack_, 1, subchannel_call_destroy, *call, &call_args); @@ -820,6 +818,7 @@ grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args, gpr_log(GPR_ERROR, "error: %s", error_string); return error; } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); return GRPC_ERROR_NONE; } +} // namespace grpc_core |