diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 715 |
1 files changed, 309 insertions, 406 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b07fc3b720..f0c66c68e1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -20,6 +20,7 @@ #include <grpc/support/alloc.h> +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" @@ -42,99 +43,73 @@ typedef struct { /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ - grpc_subchannel **subchannels; - grpc_subchannel **new_subchannels; - size_t num_subchannels; - size_t num_new_subchannels; - - grpc_closure connectivity_changed; - - /** remaining members are protected by the combiner */ - - /** the selected channel */ - grpc_connected_subchannel *selected; - - /** the subchannel key for \a selected, or NULL if \a selected not set */ - const grpc_subchannel_key *selected_key; - + grpc_lb_subchannel_list *subchannel_list; + /** latest pending subchannel list */ + grpc_lb_subchannel_list *latest_pending_subchannel_list; + /** selected subchannel in \a subchannel_list */ + grpc_lb_subchannel_data *selected; /** have we started picking? */ bool started_picking; /** are we shut down? */ bool shutdown; - /** are we updating the selected subchannel? */ - bool updating_selected; - /** are we updating the subchannel candidates? */ - bool updating_subchannels; - /** args from the latest update received while already updating, or NULL */ - grpc_lb_policy_args *pending_update_args; - /** which subchannel are we watching? */ - size_t checking_subchannel; - /** what is the connectivity of that channel? */ - grpc_connectivity_state checking_connectivity; /** list of picks that are waiting on connectivity */ pending_pick *pending_picks; - /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + GPR_ASSERT(p->subchannel_list == NULL); + GPR_ASSERT(p->latest_pending_subchannel_list == NULL); GPR_ASSERT(p->pending_picks == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); - } - if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "picked_first_destroy"); - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - grpc_subchannel_index_unref(); - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - gpr_free(p->subchannels); - gpr_free(p->new_subchannels); gpr_free(p); + grpc_subchannel_index_unref(); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p); } } -static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - p->shutdown = true; - pp = p->pending_picks; - p->pending_picks = NULL; - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); - /* cancel subscription */ - if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0 && p->started_picking) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); +static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p, + grpc_error *error) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } - while (pp != NULL) { - pending_pick *next = pp->next; + p->shutdown = true; + pending_pick *pp; + while ((pp = p->pending_picks) != NULL) { + p->pending_picks = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); - pp = next; } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "shutdown"); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_shutdown"); + p->subchannel_list = NULL; + } + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); + p->latest_pending_subchannel_list = NULL; + } + GRPC_ERROR_UNREF(error); +} + +static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + shutdown_locked(exec_ctx, (pick_first_lb_policy *)pol, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); } static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -158,8 +133,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -181,15 +155,12 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = true; - if (p->subchannels != NULL) { - GPR_ASSERT(p->num_subchannels > 0); - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + if (p->subchannel_list != NULL && p->subchannel_list->num_subchannels > 0) { + p->subchannel_list->checking_subchannel = 0; + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[0]); } } @@ -206,19 +177,17 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - - /* Check atomically for a selected channel */ + // If we have a selected subchannel already, return synchronously. if (p->selected != NULL) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, + "picked"); return 1; } - - /* No subchannel selected yet, so try again */ + // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = (pending_pick *)gpr_malloc(sizeof(*pp)); + pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->initial_metadata_flags = pick_args->initial_metadata_flags; @@ -227,19 +196,15 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels = p->subchannels; - - p->num_subchannels = 0; - p->subchannels = NULL; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - - for (size_t i = 0; i < num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); +static void destroy_unselected_subchannels_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[i]; + if (p->selected != sd) { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "selected_different_subchannel"); + } } - gpr_free(subchannels); } static grpc_connectivity_state pf_check_connectivity_locked( @@ -261,46 +226,24 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, + closure); } else { GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } -/* unsubscribe all subchannels */ -static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - if (p->num_subchannels > 0) { - GPR_ASSERT(p->selected == NULL); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p", - (void *)p, (void *)p->subchannels[p->checking_subchannel]); - } - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); - p->updating_subchannels = true; - } else if (p->selected != NULL) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, - "Pick First %p unsubscribing from selected subchannel %p", - (void *)p, (void *)p->selected); - } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - p->updating_selected = true; - } -} +static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); -/* true upon success */ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - if (p->subchannels == NULL) { + if (p->subchannel_list == NULL) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -317,270 +260,222 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; - if (addresses->num_addresses == 0) { - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)addresses->num_addresses); + } + grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args, + pf_connectivity_changed_locked); + if (subchannel_list->num_subchannels == 0) { + // Empty update or no valid subchannels. Unsubscribe from all current + // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - stop_connectivity_watchers(exec_ctx, p); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); + } + p->subchannel_list = subchannel_list; // Empty list. + p->selected = NULL; return; } - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void *)p, (unsigned long)addresses->num_addresses); - } - grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( - sizeof(*sc_args) * addresses->num_addresses); - /* We remove the following keys in order for subchannel keys belonging to - * subchannels point to the same address to match. */ - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - size_t sc_args_count = 0; - - /* Create list of subchannel args for new addresses in \a args. */ - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); + if (p->selected == NULL) { + // We don't yet have a selected subchannel, so replace the current + // subchannel list immediately. + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_update_before_selected"); } - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args[sc_args_count++].args = new_args; - } - - /* Check if p->selected is amongst them. If so, we are done. */ - if (p->selected != NULL) { - GPR_ASSERT(p->selected_key != NULL); - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); - const bool found_selected = - grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; - grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); - if (found_selected) { + p->subchannel_list = subchannel_list; + } else { + // We do have a selected subchannel. + // Check if it's present in the new list. If so, we're done. + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel == p->selected->subchannel) { // The currently selected subchannel is in the update: we are done. if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p amongst " - "updates. Update done.", - (void *)p, (void *)p->selected); + "Pick First %p found already selected subchannel %p " + "at update index %" PRIuPTR " of %" PRIuPTR "; update done", + p, p->selected->subchannel, i, + subchannel_list->num_subchannels); } - for (size_t j = 0; j < sc_args_count; j++) { - grpc_channel_args_destroy(exec_ctx, - (grpc_channel_args *)sc_args[j].args); + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+replace_selected"); + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "pf_update_includes_selected"); + } + p->subchannel_list = subchannel_list; + if (p->selected->connected_subchannel != NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "pf_update_includes_selected"); + } + p->selected = sd; + destroy_unselected_subchannels_locked(exec_ctx, p); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "pf_update_includes_selected+outdated"); + p->latest_pending_subchannel_list = NULL; } - gpr_free(sc_args); return; } } - } - // We only check for already running updates here because if the previous - // steps were successful, the update can be considered done without any - // interference (ie, no callbacks were scheduled). - if (p->updating_selected || p->updating_subchannels) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for pick first %p. Deferring update.", - (void *)p); - } - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - p->pending_update_args = - (grpc_lb_policy_args *)gpr_zalloc(sizeof(*p->pending_update_args)); - p->pending_update_args->client_channel_factory = - args->client_channel_factory; - p->pending_update_args->args = grpc_channel_args_copy(args->args); - p->pending_update_args->combiner = args->combiner; - return; - } - /* Create the subchannels for the new subchannel args/addresses. */ - grpc_subchannel **new_subchannels = - (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); - size_t num_new_subchannels = 0; - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args[i]); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_INFO, - "Pick First %p created subchannel %p for address uri %s", - (void *)p, (void *)subchannel, address_uri); - gpr_free(address_uri); + // Not keeping the previous selected subchannel, so set the latest + // pending subchannel list to the new subchannel list. We will wait + // for it to report READY before swapping it into the current + // subchannel list. + if (p->latest_pending_subchannel_list != NULL) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "Pick First %p Shutting down latest pending subchannel list " + "%p, about to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); } - grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); - if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; + p->latest_pending_subchannel_list = subchannel_list; } - gpr_free(sc_args); - if (num_new_subchannels == 0) { - gpr_free(new_subchannels); - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), - "pf_update_no_valid_addresses"); - stop_connectivity_watchers(exec_ctx, p); - return; - } - - /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ - stop_connectivity_watchers(exec_ctx, p); - - /* Save new subchannels. The switch over will happen in - * pf_connectivity_changed_locked */ - if (p->updating_selected || p->updating_subchannels) { - p->num_new_subchannels = num_new_subchannels; - p->new_subchannels = new_subchannels; - } else { /* nothing is updating. Get things moving from here */ - p->num_subchannels = num_new_subchannels; - p->subchannels = new_subchannels; - p->new_subchannels = NULL; - p->num_new_subchannels = 0; - if (p->started_picking) { - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (p->started_picking) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+update"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &subchannel_list->subchannels[0]); } } static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)arg; - grpc_subchannel *selected_subchannel; - pending_pick *pp; - + grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; + pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log( - GPR_DEBUG, - "Pick First %p connectivity changed. Updating selected: %d; Updating " - "subchannels: %d; Checking %lu index (%lu total); State: %d; ", - (void *)p, p->updating_selected, p->updating_subchannels, - (unsigned long)p->checking_subchannel, - (unsigned long)p->num_subchannels, p->checking_connectivity); - } - bool restart = false; - if (p->updating_selected && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for p->selected */ - GPR_ASSERT(p->selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p", - (void *)p, (void *)p->selected); - } - p->updating_selected = false; - if (p->num_new_subchannels == 0) { - p->selected = NULL; - return; - } - restart = true; + gpr_log(GPR_DEBUG, + "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR + " of %" PRIuPTR + "), subchannel_list %p: state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, + sd->subchannel_list->checking_subchannel, + sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); } - if (p->updating_subchannels && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for the checking subchannel */ - GPR_ASSERT(p->selected == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p, - (void *)p->subchannels[i]); - } - } - gpr_free(p->subchannels); - p->subchannels = NULL; - p->num_subchannels = 0; - p->updating_subchannels = false; - if (p->num_new_subchannels == 0) return; - restart = true; - } - if (restart) { - p->selected = NULL; - p->selected_key = NULL; - GPR_ASSERT(p->new_subchannels != NULL); - GPR_ASSERT(p->num_new_subchannels > 0); - p->num_subchannels = p->num_new_subchannels; - p->subchannels = p->new_subchannels; - p->num_new_subchannels = 0; - p->new_subchannels = NULL; - if (p->started_picking) { - /* If we were picking, continue to do so over the new subchannels, - * starting from the 0th index. */ - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - /* reuses the weak ref from start_picking_locked */ - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } - if (p->pending_update_args != NULL) { - const grpc_lb_policy_args *args = p->pending_update_args; - p->pending_update_args = NULL; - pf_update_locked(exec_ctx, &p->base, args); - } + // If the policy is shutting down, unref and return. + if (p->shutdown) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_shutdown"); return; } - GRPC_ERROR_REF(error); - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); - GRPC_ERROR_UNREF(error); + // If the subchannel list is shutting down, stop watching. + if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_sl_shutdown"); return; - } else if (p->selected != NULL) { - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - p->checking_connectivity, GRPC_ERROR_REF(error), - "selected_changed"); - if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + } + // If we're still here, the notification must be for a subchannel in + // either the current or latest pending subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->subchannel_list || + sd->subchannel_list == p->latest_pending_subchannel_list); + // Update state. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + // Handle updates for the currently selected subchannel. + if (p->selected == sd) { + // If the new state is anything other than READY and there is a + // pending update, switch to the pending update. + if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + p->latest_pending_subchannel_list != NULL) { + p->selected = NULL; + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* if the selected channel goes bad, we're done */ + sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); + if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + } else { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); + shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); + } } - } else { - loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: + return; + } + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + } + while (true) { + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_READY: { + // Case 2. Promote p->latest_pending_subchannel_list to + // p->subchannel_list. + if (sd->subchannel_list == p->latest_pending_subchannel_list) { + GPR_ASSERT(p->subchannel_list != NULL); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "finish_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + } + // Cases 1 and 2. grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected_subchannel), - "picked_first"); - + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + p->selected = sd; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Pick First %p selected subchannel %p (connected %p)", - (void *)p, (void *)selected_subchannel, (void *)p->selected); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void *)p, + (void *)sd->subchannel); } - p->selected_key = grpc_subchannel_get_key(selected_subchannel); - /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - destroy_subchannels_locked(exec_ctx, p); - /* update any calls that were waiting for a pick */ + // Drop all other subchannels, since we are now connected. + destroy_unselected_subchannels_locked(exec_ctx, p); + // Update any calls that were waiting for a pick. + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -589,76 +484,86 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - if (p->checking_subchannel == 0) { - /* only trigger transient failure when we've tried all alternatives - */ + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); + // Case 1: Only set state to TRANSIENT_FAILURE if we've tried + // all subchannels. + if (sd->subchannel_list->checking_subchannel == 0 && + sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } else { - goto loop; + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; } - break; + break; // Go back to top of loop. + } case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - break; - case GRPC_CHANNEL_SHUTDOWN: - p->num_subchannels--; - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "pick_first"); - if (p->num_subchannels == 0) { + case GRPC_CHANNEL_IDLE: { + // Only update connectivity state in case 1. + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1), - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "pick_first_connectivity"); - } else { + exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), "connecting_changed"); + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_candidate_shutdown"); + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data *original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick first exhausted channels", &error, 1)); + return; + } + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); - p->checking_subchannel %= p->num_subchannels; - GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - goto loop; } + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + // For any other state, go back to top of loop. + // We will reuse the connectivity refs from the previous watch. + } } } - - GRPC_ERROR_UNREF(error); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { @@ -688,8 +593,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_subchannel_index_ref(); - GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner)); return &p->base; } |