aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2018-01-09 14:24:32 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2018-01-09 15:59:12 -0800
commitbaf1ac7af91eab2da6024b05ddb83720d9644b94 (patch)
tree74268437337eaa2fcc9fd777a317750b3d961043 /src
parent53bfe69f707e3729cd5845091a1282771b7e45ee (diff)
PR comments
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc12
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc84
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc50
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc115
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h35
-rw-r--r--src/core/lib/support/ref_counted.h3
10 files changed, 149 insertions, 173 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 4f3b774212..fce5f3582b 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -856,7 +856,7 @@ typedef struct client_channel_call_data {
grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::ConnectedSubchannel* connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity* pollent;
@@ -1004,7 +1004,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- const grpc_connected_subchannel::CallArgs call_args = {
+ const grpc_core::ConnectedSubchannel::CallArgs call_args = {
calld->pollent, // pollent
calld->path, // path
calld->call_start_time, // start_time
@@ -1014,7 +1014,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
calld->call_combiner // call_combiner
};
grpc_error* new_error = calld->connected_subchannel->CreateCall(
- &call_args, &calld->subchannel_call);
+ call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 7a5a8dec34..ebaeaadfc5 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -102,7 +102,7 @@ void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context,
void** user_data, grpc_closure* on_complete) {
return policy->vtable->pick_locked(policy, pick_args, target, context,
@@ -110,7 +110,7 @@ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
}
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
policy->vtable->cancel_pick_locked(policy, target, error);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 628127106b..967253418e 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -61,13 +61,13 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_pick */
int (*pick_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick_locked)(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error);
/** \see grpc_lb_policy_cancel_picks */
@@ -160,11 +160,12 @@ void grpc_lb_policy_init(grpc_lb_policy* policy,
in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context,
void** user_data, grpc_closure* on_complete);
-/** Perform a connected subchannel ping (see \a grpc_connected_subchannel::Ping)
+/** Perform a connected subchannel ping (see \a
+ grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
@@ -174,7 +175,7 @@ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index ba4e90d4c2..ebc7fdac4c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -157,7 +157,7 @@ typedef struct wrapped_rr_closure_arg {
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* the context to be populated for the subchannel call */
grpc_call_context_element* context;
@@ -242,7 +242,7 @@ typedef struct pending_pick {
/* output argument where to store the pick()ed connected subchannel, or
* nullptr upon error. */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
@@ -250,7 +250,7 @@ typedef struct pending_pick {
static void add_pending_pick(pending_pick** root,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context,
grpc_closure* on_complete) {
pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
@@ -657,7 +657,7 @@ static void update_lb_connectivity_status_locked(
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
- bool force_async, grpc_connected_subchannel** target,
+ bool force_async, grpc_core::ConnectedSubchannel** target,
wrapped_rr_closure_arg* wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) {
@@ -1090,7 +1090,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = glb_policy->pending_picks;
@@ -1184,7 +1184,7 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
static int glb_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
if (pick_args->lb_token_mdelem_storage == nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index a3b05aacaf..e70f2a8c52 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -34,7 +34,7 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
typedef struct pending_pick {
struct pending_pick* next;
uint32_t initial_metadata_flags;
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
grpc_closure* on_complete;
} pending_pick;
@@ -102,7 +102,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) {
}
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
pending_pick* pp = p->pending_picks;
@@ -174,7 +174,7 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) {
static int pf_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
@@ -396,6 +396,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Handle updates for the currently selected subchannel.
if (p->selected == sd) {
+ gpr_log(GPR_INFO, "BAR selected. subchannel %p, conn subchannel %p",
+ sd->subchannel, p->selected->connected_subchannel);
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
@@ -412,25 +414,13 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
- if (sd->curr_connectivity_state < GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else { // in transient failure or shutdown. Rely on re-resolution to
- // recover.
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
- }
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -438,10 +428,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
return;
@@ -459,6 +459,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ grpc_core::ConnectedSubchannel* con =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
+ if (con == nullptr) {
+ // The subchannel may have become disconnected by the time this callback
+ // is invoked. Simply ignore and resubscribe: ulterior connectivity
+ // states
+ // must be in the pipeline and will eventually be invoked.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ break;
+ }
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -469,9 +479,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ GRPC_CONNECTED_SUBCHANNEL_REF(con, "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -530,39 +539,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 0836dad2f6..a6a8fbb3cf 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -58,7 +58,7 @@ typedef struct pending_pick {
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* to be invoked once the pick() has completed (regardless of success) */
grpc_closure* on_complete;
@@ -199,7 +199,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) {
}
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
pending_pick* pp = p->pending_picks;
@@ -267,7 +267,7 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) {
static int rr_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
@@ -316,15 +316,14 @@ static int rr_pick_locked(grpc_lb_policy* pol,
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -334,8 +333,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -401,6 +398,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
round_robin_lb_policy* p =
(round_robin_lb_policy*)sd->subchannel_list->policy;
if (grpc_lb_round_robin_trace.enabled()) {
@@ -444,23 +442,16 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel,
- "connected_subchannel_transient_failure");
- sd->connected_subchannel = nullptr;
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ if (sd->connected_subchannel != nullptr) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(
+ sd->connected_subchannel, "connected_subchannel_transient_failure");
+ sd->connected_subchannel = nullptr;
+ }
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
@@ -522,10 +513,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE();
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -549,7 +545,7 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_core::ConnectedSubchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping");
target->Ping(on_initiate, on_ack);
GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0f8cea9347..e4db3ef464 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -43,7 +43,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */
grpc_subchannel* subchannel;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::ConnectedSubchannel* connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 25615b6326..2f1662e63b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -56,8 +56,8 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_core::ConnectedSubchannel*)(gpr_atm_##barrier##_load( \
&(subchannel)->connected_subchannel)))
typedef struct {
@@ -106,7 +106,8 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
- /** active connection, or null; of type grpc_connected_subchannel */
+ /** active connection, or null; of type grpc_core::ConnectedSubchannel
+ */
gpr_atm connected_subchannel;
/** mutex protecting remaining elements */
@@ -135,7 +136,7 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel* connection;
+ grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
};
@@ -166,14 +167,14 @@ static void connection_destroy(void* arg, grpc_error* error) {
gpr_free(stk);
}
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref(
+ grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
c->Ref(DEBUG_LOCATION, REF_REASON);
return c;
}
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void ConnectedSubchannel_unref(
+ grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
c->Unref(DEBUG_LOCATION, REF_REASON);
}
@@ -247,7 +248,7 @@ static void disconnect(grpc_subchannel* c) {
c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
- grpc_connected_subchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ grpc_core::ConnectedSubchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect");
gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
@@ -535,11 +536,15 @@ static void on_connected_subchannel_connectivity_changed(void* p,
auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
/* if we failed just leave this closure */
- if (connected_subchannel_watcher->connectivity_state ==
+ if (connected_subchannel_watcher->connectivity_state >=
GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (!c->disconnected && con != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure");
gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr);
+ gpr_log(
+ GPR_INFO,
+ "LOL FORMER Connected subchannel %p of subchannel %p is now NULL.",
+ con, c);
grpc_connectivity_state_set(&c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "reflect_child");
@@ -547,28 +552,28 @@ static void on_connected_subchannel_connectivity_changed(void* p,
c->backoff->Reset();
if (grpc_trace_stream_refcount.enabled()) {
gpr_log(GPR_INFO,
- "Connected subchannel %p of subchannel %p has gone into "
- "TRANSIENT_FAILURE. Attempting to reconnect.",
- con, c);
+ "Connected subchannel %p of subchannel %p has gone into %s. "
+ "Attempting to reconnect.",
+ con, c, grpc_connectivity_state_name(
+ connected_subchannel_watcher->connectivity_state));
}
maybe_start_connecting_locked(c);
- goto done;
} else {
connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
}
+ } else {
+ grpc_connectivity_state_set(
+ &c->state_tracker, connected_subchannel_watcher->connectivity_state,
+ GRPC_ERROR_REF(error), "reflect_child");
+ if (connected_subchannel_watcher->connectivity_state <
+ GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ con->NotifyOnStateChange(
+ nullptr, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
+ connected_subchannel_watcher = nullptr;
+ }
}
- grpc_connectivity_state_set(&c->state_tracker,
- connected_subchannel_watcher->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- if (connected_subchannel_watcher->connectivity_state <
- GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- con->NotifyOnStateChange(nullptr,
- &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
- connected_subchannel_watcher = nullptr;
- }
-done:
gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
gpr_free(connected_subchannel_watcher);
@@ -619,8 +624,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
I'd have expected the rel_cas below to be enough, but
seemingly it's not.
Re-evaluate if we really need this. */
- grpc_connected_subchannel* con =
- grpc_core::New<grpc_connected_subchannel>(stk);
+ grpc_core::ConnectedSubchannel* con =
+ grpc_core::New<grpc_core::ConnectedSubchannel>(stk);
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
@@ -677,7 +682,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel* connection = c->connection;
+ grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
@@ -711,7 +716,7 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
+grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel(
grpc_subchannel* c) {
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
@@ -757,24 +762,16 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}
-grpc_connected_subchannel::grpc_connected_subchannel(
- grpc_channel_stack* channel_stack)
+namespace grpc_core {
+ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
: grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
channel_stack_(channel_stack) {}
-grpc_connected_subchannel* grpc_connected_subchannel::Ref(
- const grpc_core::DebugLocation& location, const char* reason) {
- GRPC_CHANNEL_STACK_REF(channel_stack_, REF_REASON);
- grpc_core::RefCountedWithTracing::Ref(location, reason);
- return this;
-}
-void grpc_connected_subchannel::Unref(const grpc_core::DebugLocation& location,
- const char* reason) {
- GRPC_CHANNEL_STACK_UNREF(channel_stack_, REF_REASON);
- grpc_core::RefCountedWithTracing::Unref(location, reason);
+ConnectedSubchannel::~ConnectedSubchannel() {
+ GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
-void grpc_connected_subchannel::NotifyOnStateChange(
+void ConnectedSubchannel::NotifyOnStateChange(
grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
@@ -786,8 +783,8 @@ void grpc_connected_subchannel::NotifyOnStateChange(
elem->filter->start_transport_op(elem, op);
}
-void grpc_connected_subchannel::Ping(grpc_closure* on_initiate,
- grpc_closure* on_ack) {
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
@@ -796,22 +793,23 @@ void grpc_connected_subchannel::Ping(grpc_closure* on_initiate,
elem->filter->start_transport_op(elem, op);
}
-grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args,
- grpc_subchannel_call** call) {
+grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
+ grpc_subchannel_call** call) {
*call = (grpc_subchannel_call*)gpr_arena_alloc(
- args->arena,
+ args.arena,
sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- (*call)->connection = Ref(DEBUG_LOCATION, "subchannel_call");
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ (*call)->connection = this;
const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args->context, /* context */
- args->path, /* path */
- args->start_time, /* start_time */
- args->deadline, /* deadline */
- args->arena, /* arena */
- args->call_combiner /* call_combiner */
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner /* call_combiner */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
@@ -820,6 +818,7 @@ grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args,
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
- grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index d9a850daae..dbc5742787 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -49,9 +49,9 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
- grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
+ ConnectedSubchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r))
+ ConnectedSubchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
@@ -65,15 +65,15 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) ConnectedSubchannel_ref((p))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) ConnectedSubchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
-class grpc_connected_subchannel : public grpc_core::RefCountedWithTracing {
+namespace grpc_core {
+class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
public:
struct CallArgs {
grpc_polling_entity* pollent;
@@ -85,21 +85,20 @@ class grpc_connected_subchannel : public grpc_core::RefCountedWithTracing {
grpc_call_combiner* call_combiner;
};
- grpc_connected_subchannel(grpc_channel_stack* channel_stack);
- grpc_connected_subchannel* Ref(const grpc_core::DebugLocation& location,
- const char* reason);
- void Unref(const grpc_core::DebugLocation& location, const char* reason);
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ ~ConnectedSubchannel();
+
grpc_channel_stack* channel_stack() { return channel_stack_; }
void NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
-
- grpc_error* CreateCall(const CallArgs* args, grpc_subchannel_call** call);
+ grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
private:
grpc_channel_stack* channel_stack_;
};
+} // namespace grpc_core
grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -111,10 +110,10 @@ grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref(
+ grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void ConnectedSubchannel_unref(
+ grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
@@ -130,9 +129,9 @@ void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify);
-/** retrieve the grpc_connected_subchannel - or NULL if called before
+/** retrieve the grpc_core::ConnectedSubchannel - or NULL if called before
the subchannel becomes connected */
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
+grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel(
grpc_subchannel* subchannel);
/** return the subchannel index key for \a subchannel */
diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h
index f2182baea1..8fdc3458d1 100644
--- a/src/core/lib/support/ref_counted.h
+++ b/src/core/lib/support/ref_counted.h
@@ -45,6 +45,7 @@ class RefCounted {
// Not copyable nor movable.
RefCounted(const RefCounted&) = delete;
RefCounted& operator=(const RefCounted&) = delete;
+ GRPC_ABSTRACT_BASE_CLASS
protected:
// Allow Delete() to access destructor.
@@ -112,6 +113,8 @@ class RefCountedWithTracing {
gpr_ref_init(&refs_, 1);
}
+ virtual ~RefCountedWithTracing() {}
+
private:
TraceFlag* trace_flag_ = nullptr;
gpr_refcount refs_;