diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 154 |
1 files changed, 54 insertions, 100 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 0861261359..725b78d478 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -31,13 +31,6 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); -typedef struct pending_pick { - struct pending_pick* next; - uint32_t initial_metadata_flags; - grpc_connected_subchannel** target; - grpc_closure* on_complete; -} pending_pick; - typedef struct { /** base policy: must be first */ grpc_lb_policy base; @@ -52,7 +45,7 @@ typedef struct { /** are we shut down? */ bool shutdown; /** list of picks that are waiting on connectivity */ - pending_pick* pending_picks; + grpc_lb_policy_pick_state* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; @@ -70,19 +63,27 @@ static void pf_destroy(grpc_lb_policy* pol) { } } -static void pf_shutdown_locked(grpc_lb_policy* pol) { +static void pf_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } p->shutdown = true; - pending_pick* pp; - while ((pp = p->pending_picks) != nullptr) { - p->pending_picks = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks) != nullptr) { + p->pending_picks = pick->next; + if (new_policy != nullptr) { + // Hand off to new LB policy. + if (grpc_lb_policy_pick_locked(new_policy, pick)) { + // Synchronous return, schedule closure. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } + } else { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + } } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); @@ -102,19 +103,18 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) { } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + grpc_lb_policy_pick_state* next = pp->next; + if (pp == pick) { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); - gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -129,21 +129,20 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pick = p->pending_picks; p->pending_picks = nullptr; - while (pp != nullptr) { - pending_pick* next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + while (pick != nullptr) { + grpc_lb_policy_pick_state* next = pick->next; + if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(pp->on_complete, + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); - gpr_free(pp); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; } - pp = next; + pick = next; } GRPC_ERROR_UNREF(error); } @@ -173,27 +172,19 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { } static int pf_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { + grpc_lb_policy_pick_state* pick) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, - "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; return 1; } // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(p); } - pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->on_complete = on_complete; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; return 0; } @@ -225,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(p->selected->connected_subchannel, - on_initiate, on_ack); + p->selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -305,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy, subchannel_list->num_subchannels); } if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "pf_update_includes_selected"); + sd->connected_subchannel = p->selected->connected_subchannel; } p->selected = sd; if (p->subchannel_list != nullptr) { @@ -418,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || - sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -427,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // in transient failure. Rely on re-resolution to recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); - } - if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -458,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -468,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -479,18 +466,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Drop all other subchannels, since we are now connected. destroy_unselected_subchannels_locked(p); // Update any calls that were waiting for a pick. - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks)) { + p->pending_picks = pick->next; + pick->connected_subchannel = p->selected->connected_subchannel; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", (void*)p->selected); } - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); @@ -529,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_exhausted_subchannels"); - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - } - } else { - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - } + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(break); } } |