aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc12
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc84
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc50
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
4 files changed, 61 insertions, 87 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index ba4e90d4c2..ebc7fdac4c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -157,7 +157,7 @@ typedef struct wrapped_rr_closure_arg {
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* the context to be populated for the subchannel call */
grpc_call_context_element* context;
@@ -242,7 +242,7 @@ typedef struct pending_pick {
/* output argument where to store the pick()ed connected subchannel, or
* nullptr upon error. */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
@@ -250,7 +250,7 @@ typedef struct pending_pick {
static void add_pending_pick(pending_pick** root,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context,
grpc_closure* on_complete) {
pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
@@ -657,7 +657,7 @@ static void update_lb_connectivity_status_locked(
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
- bool force_async, grpc_connected_subchannel** target,
+ bool force_async, grpc_core::ConnectedSubchannel** target,
wrapped_rr_closure_arg* wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) {
@@ -1090,7 +1090,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = glb_policy->pending_picks;
@@ -1184,7 +1184,7 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
static int glb_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
if (pick_args->lb_token_mdelem_storage == nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index a3b05aacaf..e70f2a8c52 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -34,7 +34,7 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
typedef struct pending_pick {
struct pending_pick* next;
uint32_t initial_metadata_flags;
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
grpc_closure* on_complete;
} pending_pick;
@@ -102,7 +102,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) {
}
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
pending_pick* pp = p->pending_picks;
@@ -174,7 +174,7 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) {
static int pf_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
@@ -396,6 +396,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Handle updates for the currently selected subchannel.
if (p->selected == sd) {
+ gpr_log(GPR_INFO, "BAR selected. subchannel %p, conn subchannel %p",
+ sd->subchannel, p->selected->connected_subchannel);
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
@@ -412,25 +414,13 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
- if (sd->curr_connectivity_state < GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else { // in transient failure or shutdown. Rely on re-resolution to
- // recover.
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
- }
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -438,10 +428,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
return;
@@ -459,6 +459,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ grpc_core::ConnectedSubchannel* con =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
+ if (con == nullptr) {
+ // The subchannel may have become disconnected by the time this callback
+ // is invoked. Simply ignore and resubscribe: ulterior connectivity
+ // states
+ // must be in the pipeline and will eventually be invoked.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ break;
+ }
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -469,9 +479,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ GRPC_CONNECTED_SUBCHANNEL_REF(con, "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -530,39 +539,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 0836dad2f6..a6a8fbb3cf 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -58,7 +58,7 @@ typedef struct pending_pick {
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
- grpc_connected_subchannel** target;
+ grpc_core::ConnectedSubchannel** target;
/* to be invoked once the pick() has completed (regardless of success) */
grpc_closure* on_complete;
@@ -199,7 +199,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) {
}
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
pending_pick* pp = p->pending_picks;
@@ -267,7 +267,7 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) {
static int rr_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
+ grpc_core::ConnectedSubchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
@@ -316,15 +316,14 @@ static int rr_pick_locked(grpc_lb_policy* pol,
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -334,8 +333,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -401,6 +398,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
round_robin_lb_policy* p =
(round_robin_lb_policy*)sd->subchannel_list->policy;
if (grpc_lb_round_robin_trace.enabled()) {
@@ -444,23 +442,16 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel,
- "connected_subchannel_transient_failure");
- sd->connected_subchannel = nullptr;
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ if (sd->connected_subchannel != nullptr) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(
+ sd->connected_subchannel, "connected_subchannel_transient_failure");
+ sd->connected_subchannel = nullptr;
+ }
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
@@ -522,10 +513,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE();
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -549,7 +545,7 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_core::ConnectedSubchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping");
target->Ping(on_initiate, on_ack);
GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0f8cea9347..e4db3ef464 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -43,7 +43,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */
grpc_subchannel* subchannel;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::ConnectedSubchannel* connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */