aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc154
1 files changed, 54 insertions, 100 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 0861261359..725b78d478 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -31,13 +31,6 @@
grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
-typedef struct pending_pick {
- struct pending_pick* next;
- uint32_t initial_metadata_flags;
- grpc_connected_subchannel** target;
- grpc_closure* on_complete;
-} pending_pick;
-
typedef struct {
/** base policy: must be first */
grpc_lb_policy base;
@@ -52,7 +45,7 @@ typedef struct {
/** are we shut down? */
bool shutdown;
/** list of picks that are waiting on connectivity */
- pending_pick* pending_picks;
+ grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
@@ -70,19 +63,27 @@ static void pf_destroy(grpc_lb_policy* pol) {
}
}
-static void pf_shutdown_locked(grpc_lb_policy* pol) {
+static void pf_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
p->shutdown = true;
- pending_pick* pp;
- while ((pp = p->pending_picks) != nullptr) {
- p->pending_picks = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
- gpr_free(pp);
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks) != nullptr) {
+ p->pending_picks = pick->next;
+ if (new_policy != nullptr) {
+ // Hand off to new LB policy.
+ if (grpc_lb_policy_pick_locked(new_policy, pick)) {
+ // Synchronous return, schedule closure.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+ } else {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
@@ -102,19 +103,18 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) {
}
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ grpc_lb_policy_pick_state* next = pp->next;
+ if (pp == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
- gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
@@ -129,21 +129,20 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
- while (pp != nullptr) {
- pending_pick* next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ while (pick != nullptr) {
+ grpc_lb_policy_pick_state* next = pick->next;
+ if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
- gpr_free(pp);
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
}
- pp = next;
+ pick = next;
}
GRPC_ERROR_UNREF(error);
}
@@ -173,27 +172,19 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) {
}
static int pf_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
+ grpc_lb_policy_pick_state* pick) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) {
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel,
- "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
return 1;
}
// No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) {
start_picking_locked(p);
}
- pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->on_complete = on_complete;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
return 0;
}
@@ -225,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(p->selected->connected_subchannel,
- on_initiate, on_ack);
+ p->selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -305,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy,
subchannel_list->num_subchannels);
}
if (p->selected->connected_subchannel != nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "pf_update_includes_selected");
+ sd->connected_subchannel = p->selected->connected_subchannel;
}
p->selected = sd;
if (p->subchannel_list != nullptr) {
@@ -418,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -427,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
- }
- if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else {
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -458,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -468,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -479,18 +466,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick.
- pending_pick* pp;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks)) {
+ p->pending_picks = pick->next;
+ pick->connected_subchannel = p->selected->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
@@ -529,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}