aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2018-01-17 16:11:54 -0800
committerGravatar GitHub <noreply@github.com>2018-01-17 16:11:54 -0800
commita8891634d32ad9556921faed2707fb304304c900 (patch)
tree4a9cf3ca03dd29b152ec618e4c89b29ad165fa50 /src
parent47fe8507a1905c20a86df09f97c3f972d643dda5 (diff)
parent4adad9997164a901cd2e5d1b7a636d940a86d581 (diff)
Merge pull request #13932 from dgquintas/conn_subchannel
Connected subchannel refactoring
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc75
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc64
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h3
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc297
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h79
9 files changed, 248 insertions, 295 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 3f3334d44a..bb29f65af9 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -1003,7 +1003,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- const grpc_connected_subchannel_call_args call_args = {
+ const grpc_core::ConnectedSubchannel::CallArgs call_args = {
calld->pollent, // pollent
calld->path, // path
calld->call_start_time, // start_time
@@ -1012,8 +1012,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
calld->pick.subchannel_call_context, // context
calld->call_combiner // call_combiner
};
- grpc_error* new_error = grpc_connected_subchannel_create_call(
- calld->pick.connected_subchannel, &call_args, &calld->subchannel_call);
+ grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
+ call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@@ -1463,7 +1463,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
}
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->pick.connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked");
+ calld->pick.connected_subchannel.reset();
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->pick.subchannel_call_context[i].value != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 1176a05b78..0cc0cb59c3 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -21,6 +21,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -54,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state {
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete;
- /// Will be set to the selected subchannel, or NULL on failure or when
+ /// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information
@@ -152,7 +153,8 @@ void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick);
-/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+/** Perform a connected subchannel ping (see \a
+ grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 272b3617b2..a8c5fb9c1b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -939,7 +939,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
gpr_free(pp);
} else {
- pp->pick->connected_subchannel = nullptr;
+ pp->pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
pp = next;
@@ -976,7 +976,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
pending_pick* next = pp->next;
if (pp->pick == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 60385272cf..725b78d478 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
}
@@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
@@ -176,8 +176,7 @@ static int pf_pick_locked(grpc_lb_policy* pol,
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) {
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
return 1;
}
// No subchannel selected yet, so handle asynchronously.
@@ -217,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(p->selected->connected_subchannel,
- on_initiate, on_ack);
+ p->selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -297,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy,
subchannel_list->num_subchannels);
}
if (p->selected->connected_subchannel != nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "pf_update_includes_selected");
+ sd->connected_subchannel = p->selected->connected_subchannel;
}
p->selected = sd;
if (p->subchannel_list != nullptr) {
@@ -410,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -419,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
- }
- if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else {
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -450,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -460,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -474,8 +469,7 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -520,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 92c7d5bd5d..dca345566a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -36,6 +36,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -127,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
(void*)p, (unsigned long)last_ready_index,
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index]
- .connected_subchannel);
+ .connected_subchannel.get());
}
}
@@ -162,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
}
@@ -192,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
@@ -216,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
@@ -262,8 +263,7 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index];
- pick->connected_subchannel =
- GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
+ pick->connected_subchannel = sd->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data;
}
@@ -272,8 +272,8 @@ static int rr_pick_locked(grpc_lb_policy* pol,
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
- p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list,
- next_ready_index);
+ p, sd->subchannel, pick->connected_subchannel.get(),
+ sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index);
@@ -291,15 +291,14 @@ static int rr_pick_locked(grpc_lb_policy* pol,
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -309,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -410,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -417,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
+ // subchannel, if any.
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ sd->connected_subchannel.reset();
+ break;
+ }
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
@@ -471,8 +468,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_picked");
+ pick->connected_subchannel = selected->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = selected->user_data;
}
@@ -485,10 +481,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE();
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -512,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(target, on_initiate, on_ack);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
+ selected->connected_subchannel;
+ target->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index 5ce1298afc..fa2ffcc796 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
}
GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
sd->subchannel = nullptr;
- if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason);
- sd->connected_subchannel = nullptr;
- }
+ sd->connected_subchannel.reset();
if (sd->user_data != nullptr) {
GPR_ASSERT(sd->user_data_vtable != nullptr);
sd->user_data_vtable->destroy(sd->user_data);
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0f8cea9347..f146c724e0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -22,6 +22,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and
@@ -43,7 +44,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */
grpc_subchannel* subchannel;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index a604c55c58..5b8a14b9e7 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/debug_location.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
@@ -55,10 +56,6 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
- &(subchannel)->connected_subchannel)))
-
namespace {
struct state_watcher {
grpc_closure closure;
@@ -98,7 +95,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
- grpc_closure connected;
+ grpc_closure on_connected;
/** callback for our alarm */
grpc_closure on_alarm;
@@ -107,12 +104,13 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
- /** active connection, or null; of type grpc_connected_subchannel */
- gpr_atm connected_subchannel;
-
/** mutex protecting remaining elements */
gpr_mu mu;
+ /** active connection, or null; of type grpc_core::ConnectedSubchannel
+ */
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+
/** have we seen a disconnection? */
bool disconnected;
/** are we connecting */
@@ -136,16 +134,15 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel* connection;
+ grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1))
-#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call*)(callstack)) - 1)
-static void subchannel_connected(void* subchannel, grpc_error* error);
+static void on_subchannel_connected(void* subchannel, grpc_error* error);
#ifndef NDEBUG
#define REF_REASON reason
@@ -163,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error);
*/
static void connection_destroy(void* arg, grpc_error* error) {
- grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg;
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
- gpr_free(c);
-}
-
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
- return c;
-}
-
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ grpc_channel_stack* stk = (grpc_channel_stack*)arg;
+ grpc_channel_stack_destroy(stk);
+ gpr_free(stk);
}
/*
@@ -243,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
}
static void disconnect(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
grpc_subchannel_index_unregister(c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
- con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
- if (con != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection");
- gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
- }
+ c->connected_subchannel.reset();
gpr_mu_unlock(&c->mu);
}
@@ -374,7 +355,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
if (new_args != nullptr) grpc_channel_args_destroy(new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
- GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c,
+ GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
@@ -397,7 +378,7 @@ static void continue_connect_locked(grpc_subchannel* c) {
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected);
+ &c->on_connected);
}
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
@@ -458,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
return;
}
- if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) {
+ if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
@@ -481,9 +462,10 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
const grpc_millis time_til_next =
c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
- gpr_log(GPR_INFO, "Retry immediately");
+ gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c);
} else {
- gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
+ gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c,
+ time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
@@ -527,75 +509,56 @@ void grpc_subchannel_notify_on_state_change(
}
}
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* con, grpc_transport_op* op) {
- grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op);
-}
-
-static void subchannel_on_child_state_changed(void* p, grpc_error* error) {
- state_watcher* sw = (state_watcher*)p;
- grpc_subchannel* c = sw->subchannel;
+static void on_connected_subchannel_connectivity_changed(void* p,
+ grpc_error* error) {
+ state_watcher* connected_subchannel_watcher = (state_watcher*)p;
+ grpc_subchannel* c = connected_subchannel_watcher->subchannel;
gpr_mu* mu = &c->mu;
gpr_mu_lock(mu);
- /* if we failed just leave this closure */
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- }
- grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_connected_subchannel_notify_on_state_change(
- GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = nullptr;
+ switch (connected_subchannel_watcher->connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected && c->connected_subchannel != nullptr) {
+ if (grpc_trace_stream_refcount.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into %s. "
+ "Attempting to reconnect.",
+ c->connected_subchannel.get(), c,
+ grpc_connectivity_state_name(
+ connected_subchannel_watcher->connectivity_state));
+ }
+ c->connected_subchannel.reset();
+ grpc_connectivity_state_set(&c->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "reflect_child");
+ c->backoff_begun = false;
+ c->backoff->Reset();
+ maybe_start_connecting_locked(c);
+ } else {
+ connected_subchannel_watcher->connectivity_state =
+ GRPC_CHANNEL_SHUTDOWN;
+ }
+ break;
+ }
+ default: {
+ grpc_connectivity_state_set(
+ &c->state_tracker, connected_subchannel_watcher->connectivity_state,
+ GRPC_ERROR_REF(error), "reflect_child");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ c->connected_subchannel->NotifyOnStateChange(
+ nullptr, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
+ connected_subchannel_watcher = nullptr;
+ }
}
-
gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
- gpr_free(sw);
-}
-
-static void connected_subchannel_state_op(grpc_connected_subchannel* con,
- grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state,
- grpc_closure* closure) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->connectivity_state = state;
- op->on_connectivity_state_change = closure;
- op->bind_pollset_set = interested_parties;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
-}
-
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* con, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* closure) {
- connected_subchannel_state_op(con, interested_parties, state, closure);
-}
-
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* con,
- grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->send_ping.on_initiate = on_initiate;
- op->send_ping.on_ack = on_ack;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
+ gpr_free(connected_subchannel_watcher);
}
static bool publish_transport_locked(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
- grpc_channel_stack* stk;
- state_watcher* sw_subchannel;
-
/* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(
@@ -607,8 +570,9 @@ static bool publish_transport_locked(grpc_subchannel* c) {
grpc_channel_stack_builder_destroy(builder);
return false;
}
+ grpc_channel_stack* stk;
grpc_error* error = grpc_channel_stack_builder_finish(
- builder, 0, 1, connection_destroy, nullptr, (void**)&con);
+ builder, 0, 1, connection_destroy, nullptr, (void**)&stk);
if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(c->connecting_result.transport);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
@@ -616,38 +580,37 @@ static bool publish_transport_locked(grpc_subchannel* c) {
GRPC_ERROR_UNREF(error);
return false;
}
- stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel));
- sw_subchannel->subchannel = c;
- sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
- GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
- sw_subchannel, grpc_schedule_on_exec_ctx);
+ state_watcher* connected_subchannel_watcher =
+ (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher));
+ connected_subchannel_watcher->subchannel = c;
+ connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
+ GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
+ on_connected_subchannel_connectivity_changed,
+ connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
if (c->disconnected) {
- gpr_free(sw_subchannel);
+ gpr_free(connected_subchannel_watcher);
grpc_channel_stack_destroy(stk);
- gpr_free(con);
+ gpr_free(stk);
return false;
}
/* publish */
- /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
- I'd have expected the rel_cas below to be enough, but
- seemingly it's not.
- Re-evaluate if we really need this. */
- gpr_atm_full_barrier();
- GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ c->connected_subchannel.reset(
+ grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
+ c->connected_subchannel.get(), c);
/* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
- grpc_connected_subchannel_notify_on_state_change(
- con, c->pollset_set, &sw_subchannel->connectivity_state,
- &sw_subchannel->closure);
+ c->connected_subchannel->NotifyOnStateChange(
+ c->pollset_set, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
/* signal completion */
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
@@ -655,11 +618,11 @@ static bool publish_transport_locked(grpc_subchannel* c) {
return true;
}
-static void subchannel_connected(void* arg, grpc_error* error) {
+static void on_subchannel_connected(void* arg, grpc_error* error) {
grpc_subchannel* c = (grpc_subchannel*)arg;
grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
- GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected");
gpr_mu_lock(&c->mu);
c->connecting = false;
if (c->connecting_result.transport != nullptr &&
@@ -694,10 +657,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel* connection = c->connection;
+ grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
+ connection->Unref(DEBUG_LOCATION, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
@@ -728,9 +691,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* c) {
- return GET_CONNECTED_SUBCHANNEL(c, acq);
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) {
+ gpr_mu_lock(&c->mu);
+ auto copy = c->connected_subchannel;
+ gpr_mu_unlock(&c->mu);
+ return copy;
}
const grpc_subchannel_key* grpc_subchannel_get_key(
@@ -738,36 +704,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key(
return subchannel->key;
}
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* con,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** call) {
- grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = (grpc_subchannel_call*)gpr_arena_alloc(
- args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args->context, /* context */
- args->path, /* path */
- args->start_time, /* start_time */
- args->deadline, /* deadline */
- args->arena, /* arena */
- args->call_combiner /* call_combiner */
- };
- grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy,
- *call, &call_args);
- if (error != GRPC_ERROR_NONE) {
- const char* error_string = grpc_error_string(error);
- gpr_log(GPR_ERROR, "error: %s", error_string);
- return error;
- }
- grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
- return GRPC_ERROR_NONE;
-}
-
grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
@@ -803,3 +739,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
(char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}
+
+namespace grpc_core {
+ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
+ channel_stack_(channel_stack) {}
+
+ConnectedSubchannel::~ConnectedSubchannel() {
+ GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
+}
+
+void ConnectedSubchannel::NotifyOnStateChange(
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* closure) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->send_ping.on_initiate = on_initiate;
+ op->send_ping.on_ack = on_ack;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
+ grpc_subchannel_call** call) {
+ *call = (grpc_subchannel_call*)gpr_arena_alloc(
+ args.arena,
+ sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
+ grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ (*call)->connection = this;
+ const grpc_call_element_args call_args = {
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner /* call_combiner */
+ };
+ grpc_error* error = grpc_call_stack_init(
+ channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
+ if (error != GRPC_ERROR_NONE) {
+ const char* error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ return error;
+ }
+ grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
+ return GRPC_ERROR_NONE;
+}
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9d34fff07a..3bcc5c2432 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -23,6 +23,8 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
+#include "src/core/lib/support/ref_counted.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -32,7 +34,6 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
-typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key;
@@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
- grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
@@ -65,14 +62,39 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
+namespace grpc_core {
+class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
+ public:
+ struct CallArgs {
+ grpc_polling_entity* pollent;
+ grpc_slice path;
+ gpr_timespec start_time;
+ grpc_millis deadline;
+ gpr_arena* arena;
+ grpc_call_context_element* context;
+ grpc_call_combiner* call_combiner;
+ };
+
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ ~ConnectedSubchannel();
+
+ grpc_channel_stack* channel_stack() { return channel_stack_; }
+ void NotifyOnStateChange(grpc_pollset_set* interested_parties,
+ grpc_connectivity_state* state,
+ grpc_closure* closure);
+ void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
+ grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+
+ private:
+ grpc_channel_stack* channel_stack_;
+};
+} // namespace grpc_core
+
grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
@@ -83,35 +105,11 @@ grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a subchannel call */
-typedef struct {
- grpc_polling_entity* pollent;
- grpc_slice path;
- gpr_timespec start_time;
- grpc_millis deadline;
- gpr_arena* arena;
- grpc_call_context_element* context;
- grpc_call_combiner* call_combiner;
-} grpc_connected_subchannel_call_args;
-
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* connected_subchannel,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** subchannel_call);
-
-/** process a transport level op */
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* subchannel, grpc_transport_op* op);
-
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel* channel, grpc_error** error);
@@ -121,17 +119,12 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel,
- grpc_closure* on_initiate,
- grpc_closure* on_ack);
-
-/** retrieve the grpc_connected_subchannel - or NULL if called before
- the subchannel becomes connected */
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* subchannel);
+
+/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected
+ * (which may happen before it initially connects or during transient failures)
+ * */
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
/** return the subchannel index key for \a subchannel */
const grpc_subchannel_key* grpc_subchannel_get_key(