diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 140 |
1 files changed, 86 insertions, 54 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index caa6aee9a6..228a77d9db 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -29,8 +29,7 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -grpc_tracer_flag grpc_lb_pick_first_trace = - GRPC_TRACER_INITIALIZER(false, "pick_first"); +grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); typedef struct pending_pick { struct pending_pick* next; @@ -66,14 +65,15 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p); grpc_subchannel_index_unref(); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void*)p); } } -static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p, - grpc_error* error) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { +static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { + pick_first_lb_policy* p = (pick_first_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } p->shutdown = true; @@ -97,14 +97,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p, exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { - shutdown_locked(exec_ctx, (pick_first_lb_policy*)pol, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); -} - static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -158,10 +155,15 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, if (p->subchannel_list != nullptr && p->subchannel_list->num_subchannels > 0) { p->subchannel_list->checking_subchannel = 0; - grpc_lb_subchannel_list_ref_for_connectivity_watch( - p->subchannel_list, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[0]); + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); + break; + } + } } } @@ -261,7 +263,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, } const grpc_lb_addresses* addresses = (const grpc_lb_addresses*)arg->value.pointer.p; - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", (void*)p, (unsigned long)addresses->num_addresses); } @@ -298,27 +300,27 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; if (sd->subchannel == p->selected->subchannel) { // The currently selected subchannel is in the update: we are done. - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " "at update index %" PRIuPTR " of %" PRIuPTR "; update done", p, p->selected->subchannel, i, subchannel_list->num_subchannels); } - grpc_lb_subchannel_list_ref_for_connectivity_watch( - subchannel_list, "connectivity_watch+replace_selected"); - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "pf_update_includes_selected"); - } - p->subchannel_list = subchannel_list; if (p->selected->connected_subchannel != nullptr) { sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( p->selected->connected_subchannel, "pf_update_includes_selected"); } p->selected = sd; + if (p->subchannel_list != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "pf_update_includes_selected"); + } + p->subchannel_list = subchannel_list; destroy_unselected_subchannels_locked(exec_ctx, p); + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+replace_selected"); + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. @@ -336,7 +338,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // for it to report READY before swapping it into the current // subchannel list. if (p->latest_pending_subchannel_list != nullptr) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down latest pending subchannel list " "%p, about to be replaced by newer latest %p", @@ -363,7 +365,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; pick_first_lb_policy* p = (pick_first_lb_policy*)sd->subchannel_list->policy; - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR " of %" PRIuPTR @@ -405,6 +407,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list != nullptr) { p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update"); grpc_lb_subchannel_list_shutdown_and_unref( exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; @@ -413,21 +418,35 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + // TODO(juanlishen): we re-resolve when the selected subchannel goes to + // TRANSIENT_FAILURE because we used to shut down in this case before + // re-resolution is introduced. But we need to investigate whether we + // really want to take any action instead of waiting for the selected + // subchannel reconnecting. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || + sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // If the selected channel goes bad, request a re-resolution. + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + "selected_changed+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + } else { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->curr_connectivity_state, - GRPC_ERROR_REF(error), "selected_changed"); if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } else { + p->selected = nullptr; grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); - shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_selected_shutdown"); } } return; @@ -460,7 +479,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_subchannel_get_connected_subchannel(sd->subchannel), "connected"); p->selected = sd; - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, (void*)sd->subchannel); } @@ -472,7 +491,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( p->selected->connected_subchannel, "picked"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", (void*)p->selected); @@ -532,24 +551,37 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } while (sd->subchannel == nullptr && sd != original_sd); if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1)); - break; - } - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); + exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels"); + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + "exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + } + } else { + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); + } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - break; } } } +static void pf_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + pick_first_lb_policy* p = (pick_first_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -560,7 +592,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_exit_idle_locked, pf_check_connectivity_locked, pf_notify_on_state_change_locked, - pf_update_locked}; + pf_update_locked, + pf_set_reresolve_closure_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} @@ -571,7 +604,7 @@ static grpc_lb_policy* create_pick_first(grpc_exec_ctx* exec_ctx, grpc_lb_policy_args* args) { GPR_ASSERT(args->client_channel_factory != nullptr); pick_first_lb_policy* p = (pick_first_lb_policy*)gpr_zalloc(sizeof(*p)); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p); } pf_update_locked(exec_ctx, &p->base, args); @@ -593,9 +626,8 @@ static grpc_lb_policy_factory* pick_first_lb_factory_create() { /* Plugin registration */ -extern "C" void grpc_lb_policy_pick_first_init() { +void grpc_lb_policy_pick_first_init() { grpc_register_lb_policy(pick_first_lb_factory_create()); - grpc_register_tracer(&grpc_lb_pick_first_trace); } -extern "C" void grpc_lb_policy_pick_first_shutdown() {} +void grpc_lb_policy_pick_first_shutdown() {} |