diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2018-01-19 08:17:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-19 08:17:29 +0100 |
commit | 461cf30159c0ada954e8e2a4e6591694f617809c (patch) | |
tree | 6c0f2b9cfe328d0ed724700449f39e709915b3bd /src/core/ext/filters/client_channel/subchannel.cc | |
parent | 471a5dc18b4465b051cddf4e02dbdf44336f80ce (diff) | |
parent | bb2f7e28edc3e3dd663ad308aed7ed632a0a17bf (diff) |
Merge branch 'master' into cmake-export-fix
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 299 |
1 files changed, 148 insertions, 151 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index a604c55c58..fe4fcbbb7d 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -37,11 +37,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" +#include "src/core/lib/gpr++/debug_location.h" +#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -55,10 +56,6 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ - &(subchannel)->connected_subchannel))) - namespace { struct state_watcher { grpc_closure closure; @@ -98,7 +95,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_closure connected; + grpc_closure on_connected; /** callback for our alarm */ grpc_closure on_alarm; @@ -107,12 +104,13 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_connected_subchannel */ - gpr_atm connected_subchannel; - /** mutex protecting remaining elements */ gpr_mu mu; + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; + /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ @@ -136,16 +134,15 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel* connection; + grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(void* subchannel, grpc_error* error); +static void on_subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -163,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error); */ static void connection_destroy(void* arg, grpc_error* error) { - grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); - gpr_free(c); -} - -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); - return c; -} - -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + grpc_channel_stack* stk = (grpc_channel_stack*)arg; + grpc_channel_stack_destroy(stk); + gpr_free(stk); } /* @@ -243,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } static void disconnect(grpc_subchannel* c) { - grpc_connected_subchannel* con; grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); - gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); - } + c->connected_subchannel.reset(); gpr_mu_unlock(&c->mu); } @@ -374,7 +355,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -397,7 +378,7 @@ static void continue_connect_locked(grpc_subchannel* c) { grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected); + &c->on_connected); } grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, @@ -458,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) { + if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } @@ -481,9 +462,10 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { const grpc_millis time_til_next = c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Retry immediately"); + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); } else { - gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, + time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); @@ -527,75 +509,56 @@ void grpc_subchannel_notify_on_state_change( } } -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* con, grpc_transport_op* op) { - grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op); -} - -static void subchannel_on_child_state_changed(void* p, grpc_error* error) { - state_watcher* sw = (state_watcher*)p; - grpc_subchannel* c = sw->subchannel; +static void on_connected_subchannel_connectivity_changed(void* p, + grpc_error* error) { + state_watcher* connected_subchannel_watcher = (state_watcher*)p; + grpc_subchannel* c = connected_subchannel_watcher->subchannel; gpr_mu* mu = &c->mu; gpr_mu_lock(mu); - /* if we failed just leave this closure */ - if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* any errors on a subchannel ==> we're done, create a new one */ - sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, - &sw->connectivity_state, &sw->closure); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - sw = nullptr; + switch (connected_subchannel_watcher->connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected && c->connected_subchannel != nullptr) { + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + c->connected_subchannel.get(), c, + grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); + } + c->connected_subchannel.reset(); + grpc_connectivity_state_set(&c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); + } else { + connected_subchannel_watcher->connectivity_state = + GRPC_CHANNEL_SHUTDOWN; + } + break; + } + default: { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + c->connected_subchannel->NotifyOnStateChange( + nullptr, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; + } } - gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(sw); -} - -static void connected_subchannel_state_op(grpc_connected_subchannel* con, - grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); -} - -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* closure) { - connected_subchannel_state_op(con, interested_parties, state, closure); -} - -void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, - grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); + gpr_free(connected_subchannel_watcher); } static bool publish_transport_locked(grpc_subchannel* c) { - grpc_connected_subchannel* con; - grpc_channel_stack* stk; - state_watcher* sw_subchannel; - /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( @@ -607,8 +570,9 @@ static bool publish_transport_locked(grpc_subchannel* c) { grpc_channel_stack_builder_destroy(builder); return false; } + grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, (void**)&con); + builder, 0, 1, connection_destroy, nullptr, (void**)&stk); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -616,38 +580,37 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } - stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->subchannel = c; - sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel, grpc_schedule_on_exec_ctx); + state_watcher* connected_subchannel_watcher = + (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); + connected_subchannel_watcher->subchannel = c; + connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; + GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, + on_connected_subchannel_connectivity_changed, + connected_subchannel_watcher, grpc_schedule_on_exec_ctx); if (c->disconnected) { - gpr_free(sw_subchannel); + gpr_free(connected_subchannel_watcher); grpc_channel_stack_destroy(stk); - gpr_free(con); + gpr_free(stk); return false; } /* publish */ - /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. - I'd have expected the rel_cas below to be enough, but - seemingly it's not. - Re-evaluate if we really need this. */ - gpr_atm_full_barrier(); - GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + c->connected_subchannel.reset( + grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + c->connected_subchannel.get(), c); /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - grpc_connected_subchannel_notify_on_state_change( - con, c->pollset_set, &sw_subchannel->connectivity_state, - &sw_subchannel->closure); + c->connected_subchannel->NotifyOnStateChange( + c->pollset_set, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -655,11 +618,11 @@ static bool publish_transport_locked(grpc_subchannel* c) { return true; } -static void subchannel_connected(void* arg, grpc_error* error) { +static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && @@ -694,10 +657,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel* connection = c->connection; + grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); + connection->Unref(DEBUG_LOCATION, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -728,9 +691,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* c) { - return GET_CONNECTED_SUBCHANNEL(c, acq); +grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { + gpr_mu_lock(&c->mu); + auto copy = c->connected_subchannel; + gpr_mu_unlock(&c->mu); + return copy; } const grpc_subchannel_key* grpc_subchannel_get_key( @@ -738,36 +704,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* con, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** call) { - grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, - *call, &call_args); - if (error != GRPC_ERROR_NONE) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); - return GRPC_ERROR_NONE; -} - grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -803,3 +739,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } + +namespace grpc_core { +ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) + : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), + channel_stack_(channel_stack) {} + +ConnectedSubchannel::~ConnectedSubchannel() { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); +} + +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { + *call = (grpc_subchannel_call*)gpr_arena_alloc( + args.arena, + sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + Ref(DEBUG_LOCATION, "subchannel_call"); + (*call)->connection = this; + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init( + channel_stack_, 1, subchannel_call_destroy, *call, &call_args); + if (error != GRPC_ERROR_NONE) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + return GRPC_ERROR_NONE; +} +} // namespace grpc_core |