aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc115
1 files changed, 57 insertions, 58 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 25615b6326..2f1662e63b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -56,8 +56,8 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_core::ConnectedSubchannel*)(gpr_atm_##barrier##_load( \
&(subchannel)->connected_subchannel)))
typedef struct {
@@ -106,7 +106,8 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
- /** active connection, or null; of type grpc_connected_subchannel */
+ /** active connection, or null; of type grpc_core::ConnectedSubchannel
+ */
gpr_atm connected_subchannel;
/** mutex protecting remaining elements */
@@ -135,7 +136,7 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel* connection;
+ grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
};
@@ -166,14 +167,14 @@ static void connection_destroy(void* arg, grpc_error* error) {
gpr_free(stk);
}
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref(
+ grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
c->Ref(DEBUG_LOCATION, REF_REASON);
return c;
}
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void ConnectedSubchannel_unref(
+ grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
c->Unref(DEBUG_LOCATION, REF_REASON);
}
@@ -247,7 +248,7 @@ static void disconnect(grpc_subchannel* c) {
c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
- grpc_connected_subchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ grpc_core::ConnectedSubchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect");
gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
@@ -535,11 +536,15 @@ static void on_connected_subchannel_connectivity_changed(void* p,
auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
/* if we failed just leave this closure */
- if (connected_subchannel_watcher->connectivity_state ==
+ if (connected_subchannel_watcher->connectivity_state >=
GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (!c->disconnected && con != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure");
gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr);
+ gpr_log(
+ GPR_INFO,
+ "LOL FORMER Connected subchannel %p of subchannel %p is now NULL.",
+ con, c);
grpc_connectivity_state_set(&c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "reflect_child");
@@ -547,28 +552,28 @@ static void on_connected_subchannel_connectivity_changed(void* p,
c->backoff->Reset();
if (grpc_trace_stream_refcount.enabled()) {
gpr_log(GPR_INFO,
- "Connected subchannel %p of subchannel %p has gone into "
- "TRANSIENT_FAILURE. Attempting to reconnect.",
- con, c);
+ "Connected subchannel %p of subchannel %p has gone into %s. "
+ "Attempting to reconnect.",
+ con, c, grpc_connectivity_state_name(
+ connected_subchannel_watcher->connectivity_state));
}
maybe_start_connecting_locked(c);
- goto done;
} else {
connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
}
+ } else {
+ grpc_connectivity_state_set(
+ &c->state_tracker, connected_subchannel_watcher->connectivity_state,
+ GRPC_ERROR_REF(error), "reflect_child");
+ if (connected_subchannel_watcher->connectivity_state <
+ GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ con->NotifyOnStateChange(
+ nullptr, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
+ connected_subchannel_watcher = nullptr;
+ }
}
- grpc_connectivity_state_set(&c->state_tracker,
- connected_subchannel_watcher->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- if (connected_subchannel_watcher->connectivity_state <
- GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- con->NotifyOnStateChange(nullptr,
- &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
- connected_subchannel_watcher = nullptr;
- }
-done:
gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
gpr_free(connected_subchannel_watcher);
@@ -619,8 +624,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
I'd have expected the rel_cas below to be enough, but
seemingly it's not.
Re-evaluate if we really need this. */
- grpc_connected_subchannel* con =
- grpc_core::New<grpc_connected_subchannel>(stk);
+ grpc_core::ConnectedSubchannel* con =
+ grpc_core::New<grpc_core::ConnectedSubchannel>(stk);
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
@@ -677,7 +682,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel* connection = c->connection;
+ grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
@@ -711,7 +716,7 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
+grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel(
grpc_subchannel* c) {
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
@@ -757,24 +762,16 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}
-grpc_connected_subchannel::grpc_connected_subchannel(
- grpc_channel_stack* channel_stack)
+namespace grpc_core {
+ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
: grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
channel_stack_(channel_stack) {}
-grpc_connected_subchannel* grpc_connected_subchannel::Ref(
- const grpc_core::DebugLocation& location, const char* reason) {
- GRPC_CHANNEL_STACK_REF(channel_stack_, REF_REASON);
- grpc_core::RefCountedWithTracing::Ref(location, reason);
- return this;
-}
-void grpc_connected_subchannel::Unref(const grpc_core::DebugLocation& location,
- const char* reason) {
- GRPC_CHANNEL_STACK_UNREF(channel_stack_, REF_REASON);
- grpc_core::RefCountedWithTracing::Unref(location, reason);
+ConnectedSubchannel::~ConnectedSubchannel() {
+ GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
-void grpc_connected_subchannel::NotifyOnStateChange(
+void ConnectedSubchannel::NotifyOnStateChange(
grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
@@ -786,8 +783,8 @@ void grpc_connected_subchannel::NotifyOnStateChange(
elem->filter->start_transport_op(elem, op);
}
-void grpc_connected_subchannel::Ping(grpc_closure* on_initiate,
- grpc_closure* on_ack) {
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
@@ -796,22 +793,23 @@ void grpc_connected_subchannel::Ping(grpc_closure* on_initiate,
elem->filter->start_transport_op(elem, op);
}
-grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args,
- grpc_subchannel_call** call) {
+grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
+ grpc_subchannel_call** call) {
*call = (grpc_subchannel_call*)gpr_arena_alloc(
- args->arena,
+ args.arena,
sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- (*call)->connection = Ref(DEBUG_LOCATION, "subchannel_call");
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ (*call)->connection = this;
const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args->context, /* context */
- args->path, /* path */
- args->start_time, /* start_time */
- args->deadline, /* deadline */
- args->arena, /* arena */
- args->call_combiner /* call_combiner */
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner /* call_combiner */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
@@ -820,6 +818,7 @@ grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args,
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
- grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}
+} // namespace grpc_core