From 5e9848e7b078ac35c0b6899d27c780e3836b3905 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 6 Oct 2017 13:59:32 -0700 Subject: Refactor subchannel_list code out of RR and use it in PF. --- .../lb_policy/pick_first/pick_first.cc | 709 +++++++++------------ .../lb_policy/round_robin/round_robin.cc | 426 +++---------- .../client_channel/lb_policy/subchannel_list.cc | 282 ++++++++ .../client_channel/lb_policy/subchannel_list.h | 149 +++++ src/python/grpcio/grpc_core_dependencies.py | 1 + 5 files changed, 834 insertions(+), 733 deletions(-) create mode 100644 src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc create mode 100644 src/core/ext/filters/client_channel/lb_policy/subchannel_list.h (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b07fc3b720..bbf0548405 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -20,6 +20,7 @@ #include +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" @@ -42,99 +43,73 @@ typedef struct { /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ - grpc_subchannel **subchannels; - grpc_subchannel **new_subchannels; - size_t num_subchannels; - size_t num_new_subchannels; - - grpc_closure connectivity_changed; - - /** remaining members are protected by the combiner */ - - /** the selected channel */ - grpc_connected_subchannel *selected; - - /** the subchannel key for \a selected, or NULL if \a selected not set */ - const grpc_subchannel_key *selected_key; - + grpc_lb_subchannel_list *subchannel_list; + /** Latest pending subchannel list. */ + grpc_lb_subchannel_list *latest_pending_subchannel_list; + /** Selected subchannel in \a subchannel_list. */ + grpc_lb_subchannel_data *selected; /** have we started picking? */ bool started_picking; /** are we shut down? */ bool shutdown; - /** are we updating the selected subchannel? */ - bool updating_selected; - /** are we updating the subchannel candidates? */ - bool updating_subchannels; - /** args from the latest update received while already updating, or NULL */ - grpc_lb_policy_args *pending_update_args; - /** which subchannel are we watching? */ - size_t checking_subchannel; - /** what is the connectivity of that channel? */ - grpc_connectivity_state checking_connectivity; /** list of picks that are waiting on connectivity */ pending_pick *pending_picks; - /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + GPR_ASSERT(p->subchannel_list == NULL); + GPR_ASSERT(p->latest_pending_subchannel_list == NULL); GPR_ASSERT(p->pending_picks == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); - } - if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "picked_first_destroy"); - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - grpc_subchannel_index_unref(); - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - gpr_free(p->subchannels); - gpr_free(p->new_subchannels); gpr_free(p); + grpc_subchannel_index_unref(); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p); } } -static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; +static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p, + grpc_error *error) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); + } p->shutdown = true; - pp = p->pending_picks; - p->pending_picks = NULL; + pending_pick *pp; + while ((pp = p->pending_picks) != NULL) { + p->pending_picks = pp->next; + *pp->target = NULL; + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); + } grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); - /* cancel subscription */ - if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0 && p->started_picking) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); + GRPC_ERROR_REF(error), "shutdown"); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_shutdown"); + p->subchannel_list = NULL; } - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - pp = next; + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); + p->latest_pending_subchannel_list = NULL; } + GRPC_ERROR_UNREF(error); +} + +static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + shutdown_locked(exec_ctx, (pick_first_lb_policy *)pol, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); } static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -158,8 +133,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -181,15 +155,12 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = true; - if (p->subchannels != NULL) { - GPR_ASSERT(p->num_subchannels > 0); - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + if (p->subchannel_list != NULL && p->subchannel_list->num_subchannels > 0) { + p->subchannel_list->checking_subchannel = 0; + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[0]); } } @@ -206,19 +177,17 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - - /* Check atomically for a selected channel */ + // If we have a selected subchannel already, return synchronously. if (p->selected != NULL) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, + "picked"); return 1; } - - /* No subchannel selected yet, so try again */ + // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = (pending_pick *)gpr_malloc(sizeof(*pp)); + pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->initial_metadata_flags = pick_args->initial_metadata_flags; @@ -227,19 +196,15 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels = p->subchannels; - - p->num_subchannels = 0; - p->subchannels = NULL; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - - for (size_t i = 0; i < num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); +static void destroy_unselected_subchannels_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[i]; + if (p->selected != sd) { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "selected_different_subchannel"); + } } - gpr_free(subchannels); } static grpc_connectivity_state pf_check_connectivity_locked( @@ -261,46 +226,24 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, + closure); } else { GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } -/* unsubscribe all subchannels */ -static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - if (p->num_subchannels > 0) { - GPR_ASSERT(p->selected == NULL); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p", - (void *)p, (void *)p->subchannels[p->checking_subchannel]); - } - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); - p->updating_subchannels = true; - } else if (p->selected != NULL) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, - "Pick First %p unsubscribing from selected subchannel %p", - (void *)p, (void *)p->selected); - } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - p->updating_selected = true; - } -} +static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); -/* true upon success */ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - if (p->subchannels == NULL) { + if (p->subchannel_list == NULL) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -317,270 +260,228 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; - if (addresses->num_addresses == 0) { - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)addresses->num_addresses); + } + grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( + exec_ctx, &p->base, addresses, args, pf_connectivity_changed_locked); + if (subchannel_list->num_subchannels == 0) { + // Empty update or no valid subchannels. Unsubscribe from all current + // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - stop_connectivity_watchers(exec_ctx, p); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); + } + p->subchannel_list = subchannel_list; // Empty list. + p->selected = NULL; return; } - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void *)p, (unsigned long)addresses->num_addresses); - } - grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( - sizeof(*sc_args) * addresses->num_addresses); - /* We remove the following keys in order for subchannel keys belonging to - * subchannels point to the same address to match. */ - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - size_t sc_args_count = 0; - - /* Create list of subchannel args for new addresses in \a args. */ - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); + if (p->selected == NULL) { + // We don't yet have a selected subchannel, so replace the current + // subchannel list immediately. + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "pf_update_before_selected"); } - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args[sc_args_count++].args = new_args; - } - - /* Check if p->selected is amongst them. If so, we are done. */ - if (p->selected != NULL) { - GPR_ASSERT(p->selected_key != NULL); - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); - const bool found_selected = - grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; - grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); - if (found_selected) { + p->subchannel_list = subchannel_list; + } else { + // We do have a selected subchannel. + // Check if it's present in the new list. If so, we're done. + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel == p->selected->subchannel) { // The currently selected subchannel is in the update: we are done. if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p amongst " - "updates. Update done.", - (void *)p, (void *)p->selected); + "Pick First %p found already selected subchannel %p " + "at update index %" PRIdPTR " of %" PRIdPTR "; update done", + p, p->selected->subchannel, i, + subchannel_list->num_subchannels); + } + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+replace_selected"); + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "pf_update_includes_selected"); + } + p->subchannel_list = subchannel_list; + if (p->selected->connected_subchannel != NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "pf_update_includes_selected"); } - for (size_t j = 0; j < sc_args_count; j++) { - grpc_channel_args_destroy(exec_ctx, - (grpc_channel_args *)sc_args[j].args); + p->selected = sd; + destroy_unselected_subchannels_locked(exec_ctx, p); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "pf_update_includes_selected+outdated"); + p->latest_pending_subchannel_list = NULL; } - gpr_free(sc_args); return; } } - } - // We only check for already running updates here because if the previous - // steps were successful, the update can be considered done without any - // interference (ie, no callbacks were scheduled). - if (p->updating_selected || p->updating_subchannels) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for pick first %p. Deferring update.", - (void *)p); - } - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - p->pending_update_args = - (grpc_lb_policy_args *)gpr_zalloc(sizeof(*p->pending_update_args)); - p->pending_update_args->client_channel_factory = - args->client_channel_factory; - p->pending_update_args->args = grpc_channel_args_copy(args->args); - p->pending_update_args->combiner = args->combiner; - return; - } - /* Create the subchannels for the new subchannel args/addresses. */ - grpc_subchannel **new_subchannels = - (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); - size_t num_new_subchannels = 0; - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args[i]); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_INFO, - "Pick First %p created subchannel %p for address uri %s", - (void *)p, (void *)subchannel, address_uri); - gpr_free(address_uri); + // Not keeping the previous selected subchannel, so set the latest + // pending subchannel list to the new subchannel list. We will wait + // for it to report READY before swapping it into the current + // subchannel list. + if (p->latest_pending_subchannel_list != NULL) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "Pick First %p Shutting down latest pending subchannel list " + "%p, about to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); } - grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); - if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; + p->latest_pending_subchannel_list = subchannel_list; } - gpr_free(sc_args); - if (num_new_subchannels == 0) { - gpr_free(new_subchannels); - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), - "pf_update_no_valid_addresses"); - stop_connectivity_watchers(exec_ctx, p); - return; - } - - /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ - stop_connectivity_watchers(exec_ctx, p); - - /* Save new subchannels. The switch over will happen in - * pf_connectivity_changed_locked */ - if (p->updating_selected || p->updating_subchannels) { - p->num_new_subchannels = num_new_subchannels; - p->new_subchannels = new_subchannels; - } else { /* nothing is updating. Get things moving from here */ - p->num_subchannels = num_new_subchannels; - p->subchannels = new_subchannels; - p->new_subchannels = NULL; - p->num_new_subchannels = 0; - if (p->started_picking) { - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (p->started_picking && subchannel_list->num_subchannels > 0) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+update"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &subchannel_list->subchannels[0]); } } static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)arg; - grpc_subchannel *selected_subchannel; - pending_pick *pp; - + grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; + pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log( GPR_DEBUG, - "Pick First %p connectivity changed. Updating selected: %d; Updating " - "subchannels: %d; Checking %lu index (%lu total); State: %d; ", - (void *)p, p->updating_selected, p->updating_subchannels, - (unsigned long)p->checking_subchannel, - (unsigned long)p->num_subchannels, p->checking_connectivity); - } - bool restart = false; - if (p->updating_selected && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for p->selected */ - GPR_ASSERT(p->selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p", - (void *)p, (void *)p->selected); - } - p->updating_selected = false; - if (p->num_new_subchannels == 0) { - p->selected = NULL; - return; - } - restart = true; - } - if (p->updating_subchannels && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for the checking subchannel */ - GPR_ASSERT(p->selected == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p, - (void *)p->subchannels[i]); - } - } - gpr_free(p->subchannels); - p->subchannels = NULL; - p->num_subchannels = 0; - p->updating_subchannels = false; - if (p->num_new_subchannels == 0) return; - restart = true; + "Pick First %p connectivity changed for subchannel %p (%" PRIdPTR + " of %" PRIdPTR "), subchannel_list %p: state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, + p->subchannel_list->checking_subchannel, + sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); } - if (restart) { - p->selected = NULL; - p->selected_key = NULL; - GPR_ASSERT(p->new_subchannels != NULL); - GPR_ASSERT(p->num_new_subchannels > 0); - p->num_subchannels = p->num_new_subchannels; - p->subchannels = p->new_subchannels; - p->num_new_subchannels = 0; - p->new_subchannels = NULL; - if (p->started_picking) { - /* If we were picking, continue to do so over the new subchannels, - * starting from the 0th index. */ - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - /* reuses the weak ref from start_picking_locked */ - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } - if (p->pending_update_args != NULL) { - const grpc_lb_policy_args *args = p->pending_update_args; - p->pending_update_args = NULL; - pf_update_locked(exec_ctx, &p->base, args); - } + // If the policy is shutting down, unref and return. + if (p->shutdown) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_shutdown"); return; } - GRPC_ERROR_REF(error); - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); - GRPC_ERROR_UNREF(error); + // If the subchannel list is shutting down, stop watching. + if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_sl_shutdown"); return; - } else if (p->selected != NULL) { - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - p->checking_connectivity, GRPC_ERROR_REF(error), - "selected_changed"); - if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + } + // If we're still here, the notification must be for a subchannel in + // either the current or latest pending subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->subchannel_list || + sd->subchannel_list == p->latest_pending_subchannel_list); + // Update state counters. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + ++sd->subchannel_list->num_shutdown; + } + sd->prev_connectivity_state = sd->curr_connectivity_state; + // Handle updates for the currently selected subchannel. + if (p->selected == sd) { + // If the new state is anything other than READY and there is a + // pending update, switch to the pending update. + if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + p->latest_pending_subchannel_list != NULL) { + p->selected = NULL; + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + grpc_lb_subchannel_data *new_sd = &p->subchannel_list->subchannels[ + p->subchannel_list->checking_subchannel]; + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + new_sd->curr_connectivity_state, + GRPC_ERROR_REF(error), + "selected_not_ready+switch_to_update"); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + if (sd->curr_connectivity_state == + GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* if the selected channel goes bad, we're done */ + sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); + if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + } else { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); + pf_shutdown_locked(exec_ctx, &p->base); + } } - } else { - loop: - switch (p->checking_connectivity) { + return; + } + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + while (true) { + switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_INIT: GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: + case GRPC_CHANNEL_READY: { + // Case 2. Promote p->latest_pending_subchannel_list to + // p->subchannel_list. + if (sd->subchannel_list == p->latest_pending_subchannel_list) { + GPR_ASSERT(p->subchannel_list != NULL); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "finish_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + } + // Cases 1 and 2. grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected_subchannel), - "picked_first"); - + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + p->selected = sd; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Pick First %p selected subchannel %p (connected %p)", - (void *)p, (void *)selected_subchannel, (void *)p->selected); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", + (void *)p, (void *)sd->subchannel); } - p->selected_key = grpc_subchannel_get_key(selected_subchannel); - /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - destroy_subchannels_locked(exec_ctx, p); - /* update any calls that were waiting for a pick */ + // Drop all other subchannels, since we are now connected. + destroy_unselected_subchannels_locked(exec_ctx, p); + // Update any calls that were waiting for a pick. + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -589,76 +490,82 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - if (p->checking_subchannel == 0) { - /* only trigger transient failure when we've tried all alternatives - */ + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) + % sd->subchannel_list->num_subchannels; + // Case 1: Only set state to TRANSIENT_FAILURE if we've tried + // all subchannels. + if (sd->subchannel_list->checking_subchannel == 0 && + sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } else { - goto loop; + sd = &sd->subchannel_list->subchannels[ + sd->subchannel_list->checking_subchannel]; + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; } - break; + break; // Go back to top of loop. + } case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - break; - case GRPC_CHANNEL_SHUTDOWN: - p->num_subchannels--; - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "pick_first"); - if (p->num_subchannels == 0) { + case GRPC_CHANNEL_IDLE: { + // Only update connectivity state in case 1. + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1), - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "pick_first_connectivity"); - } else { + exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), "connecting_changed"); + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_candidate_shutdown"); + if (sd->subchannel_list->num_shutdown == + sd->subchannel_list->num_subchannels) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick first exhausted channels", &error, 1)); + return; + } + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); - p->checking_subchannel %= p->num_subchannels; - GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - goto loop; } + // Advance to next subchannel and check its state. + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) + % sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list->subchannels[ + sd->subchannel_list->checking_subchannel]; + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + // For any other state, go back to top of loop. + // We will reuse the connectivity refs from the previous watch. + } } } - - GRPC_ERROR_UNREF(error); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { @@ -688,8 +595,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_subchannel_index_ref(); - GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6812bb50cd..b25b9b86d8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -28,6 +28,7 @@ #include +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" @@ -64,12 +65,11 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -typedef struct rr_subchannel_list rr_subchannel_list; typedef struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - rr_subchannel_list *subchannel_list; + grpc_lb_subchannel_list *subchannel_list; /** have we started picking? */ bool started_picking; @@ -89,157 +89,9 @@ typedef struct round_robin_lb_policy { * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ - rr_subchannel_list *latest_pending_subchannel_list; + grpc_lb_subchannel_list *latest_pending_subchannel_list; } round_robin_lb_policy; -typedef struct { - /** backpointer to owning subchannel list */ - rr_subchannel_list *subchannel_list; - /** subchannel itself */ - grpc_subchannel *subchannel; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** last observed connectivity. Not updated by - * \a grpc_subchannel_notify_on_state_change. Used to determine the previous - * state while processing the new state in \a rr_connectivity_changed */ - grpc_connectivity_state prev_connectivity_state; - /** current connectivity state. Updated by \a - * grpc_subchannel_notify_on_state_change */ - grpc_connectivity_state curr_connectivity_state; - /** connectivity state to be updated by the watcher, not guarded by - * the combiner. Will be moved to curr_connectivity_state inside of - * the combiner by rr_connectivity_changed_locked(). */ - grpc_connectivity_state pending_connectivity_state_unsafe; - /** the subchannel's target user data */ - void *user_data; - /** vtable to operate over \a user_data */ - const grpc_lb_user_data_vtable *user_data_vtable; -} subchannel_data; - -struct rr_subchannel_list { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; - - /** all our subchannels */ - size_t num_subchannels; - subchannel_data *subchannels; - - /** how many subchannels are in state READY */ - size_t num_ready; - /** how many subchannels are in state TRANSIENT_FAILURE */ - size_t num_transient_failures; - /** how many subchannels are in state SHUTDOWN */ - size_t num_shutdown; - /** how many subchannels are in state IDLE */ - size_t num_idle; - - /** There will be one ref for each entry in subchannels for which there is a - * pending connectivity state watcher callback. */ - gpr_refcount refcount; - - /** Is this list shutting down? This may be true due to the shutdown of the - * policy itself or because a newer update has arrived while this one hadn't - * finished processing. */ - bool shutting_down; -}; - -static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, - size_t num_subchannels) { - rr_subchannel_list *subchannel_list = - (rr_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); - subchannel_list->policy = p; - subchannel_list->subchannels = - (subchannel_data *)gpr_zalloc(sizeof(subchannel_data) * num_subchannels); - subchannel_list->num_subchannels = num_subchannels; - gpr_ref_init(&subchannel_list->refcount, 1); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels", - (void *)p, (void *)subchannel_list, (unsigned long)num_subchannels); - } - return subchannel_list; -} - -static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list) { - GPR_ASSERT(subchannel_list->shutting_down); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", - (void *)subchannel_list->policy, (void *)subchannel_list); - } - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &subchannel_list->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, - "rr_subchannel_list_destroy"); - } - sd->subchannel = NULL; - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - sd->user_data = NULL; - } - } - gpr_free(subchannel_list->subchannels); - gpr_free(subchannel_list); -} - -static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, - const char *reason) { - gpr_ref_non_zero(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count - 1), (unsigned long)count, reason); - } -} - -static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list, - const char *reason) { - const bool done = gpr_unref(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count + 1), (unsigned long)count, reason); - } - if (done) { - rr_subchannel_list_destroy(exec_ctx, subchannel_list); - } -} - -/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The - * watcher's callback will ultimately unref \a subchannel_list. */ -static void rr_subchannel_list_shutdown_and_unref( - grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, - const char *reason) { - GPR_ASSERT(!subchannel_list->shutting_down); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, reason); - } - GPR_ASSERT(!subchannel_list->shutting_down); - subchannel_list->shutting_down = true; - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &subchannel_list->subchannels[i]; - if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log( - GPR_DEBUG, - "[RR %p] Unsubscribing from subchannel %p as part of shutting down " - "subchannel_list %p", - (void *)subchannel_list->policy, (void *)sd->subchannel, - (void *)subchannel_list); - } - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } - rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); -} - /** Returns the index into p->subchannel_list->subchannels of the next * subchannel in READY state, or p->subchannel_list->num_subchannels if no * subchannel is READY. @@ -299,8 +151,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", (void *)p, (unsigned long)last_ready_index, (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannel_list->subchannels[last_ready_index].subchannel)); + (void *)p->subchannel_list->subchannels[last_ready_index] + .connected_subchannel); } } @@ -310,17 +162,18 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", (void *)pol, (void *)pol); } + GPR_ASSERT(p->subchannel_list == NULL); + GPR_ASSERT(p->latest_pending_subchannel_list == NULL); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_subchannel_index_unref(); gpr_free(p); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", - (void *)pol, (void *)pol); + gpr_log(GPR_DEBUG, "[RR %p] Shutting down", (void *)pol); } + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; p->shutdown = true; pending_pick *pp; while ((pp = p->pending_picks)) { @@ -336,14 +189,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); const bool latest_is_current = p->subchannel_list == p->latest_pending_subchannel_list; - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_shutdown"); - p->subchannel_list = NULL; + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; + } if (!latest_is_current && p->latest_pending_subchannel_list != NULL && !p->latest_pending_subchannel_list->shutting_down) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, - p->latest_pending_subchannel_list, - "sl_shutdown_pending_rr_shutdown"); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = NULL; } } @@ -400,13 +255,10 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &p->subchannel_list->subchannels[i]; - GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); - rr_subchannel_list_ref(sd->subchannel_list, "started_picking"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); } } @@ -431,10 +283,10 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { /* readily available, report right away */ - subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "rr_picked"); + grpc_lb_subchannel_data *sd = + &p->subchannel_list->subchannels[next_ready_index]; + *target = GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, + "rr_picked"); if (user_data != NULL) { *user_data = sd->user_data; } @@ -465,8 +317,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void update_state_counters_locked(subchannel_data *sd) { - rr_subchannel_list *subchannel_list = sd->subchannel_list; +static void update_state_counters_locked(grpc_lb_subchannel_data *sd) { + grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; @@ -492,12 +344,12 @@ static void update_state_counters_locked(subchannel_data *sd) { } /** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the subchannel_data associted with the updated subchannel) and the + * (the grpc_lb_subchannel_data associted with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( - grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -519,8 +371,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ grpc_connectivity_state new_state = sd->curr_connectivity_state; - rr_subchannel_list *subchannel_list = sd->subchannel_list; - round_robin_lb_policy *p = subchannel_list->policy; + grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = (round_robin_lb_policy *)subchannel_list->policy; if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); @@ -556,8 +408,9 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - subchannel_data *sd = (subchannel_data *)arg; - round_robin_lb_policy *p = sd->subchannel_list->policy; + grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; + round_robin_lb_policy *p = + (round_robin_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log( GPR_DEBUG, @@ -572,35 +425,24 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } // If the policy is shutting down, unref and return. if (p->shutdown) { - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "pol_shutdown+started_picking"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_shutdown"); return; } - if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { - // the subchannel list associated with sd has been discarded. This callback - // corresponds to the unsubscription. The unrefs correspond to the picking - // ref (start_picking_locked or update_started_picking). - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sl_shutdown+started_picking"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking"); - return; - } - // Dispose of outdated subchannel lists. - if (sd->subchannel_list != p->subchannel_list && - sd->subchannel_list != p->latest_pending_subchannel_list) { - const char *reason = NULL; - if (sd->subchannel_list->shutting_down) { - reason = "sl_outdated_straggler"; - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason); - } else { - reason = "sl_outdated"; - rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, - reason); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason); + // If the subchannel list is shutting down, stop watching. + if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_shutdown"); return; } + // If we're still here, the notification must be for a subchannel in + // either the current or latest pending subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->subchannel_list || + sd->subchannel_list == p->latest_pending_subchannel_list); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -613,30 +455,21 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - sd->subchannel = NULL; - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - sd->user_data = NULL; - } + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "rr_connectivity_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - // the policy is shutting down. Flush all the pending picks... - pending_pick *pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } + rr_shutdown_locked(exec_ctx, &p->base); } - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sd_shutdown+started_picking"); - // unref the "rr_connectivity_update" weak ref from start_picking. - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "rr_connectivity_sd_shutdown"); } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->connected_subchannel == NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. // sd->subchannel_list must be equal to @@ -657,8 +490,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (p->subchannel_list != NULL) { // dispose of the current subchannel_list - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_phase_out_shutdown"); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "sl_phase_out_shutdown"); } p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; @@ -668,7 +501,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); - subchannel_data *selected = + grpc_lb_subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { // if the selected subchannel is going to be used for the pending @@ -679,8 +512,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); + selected->connected_subchannel, "rr_picked"); if (pp->user_data != NULL) { *pp->user_data = selected->user_data; } @@ -695,12 +527,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity_update" weak ref on the - * policy as well as the sd->subchannel_list ref. */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } } @@ -724,13 +552,12 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { - subchannel_data *selected = + grpc_lb_subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); + selected->connected_subchannel, "rr_ping"); grpc_connected_subchannel_ping(exec_ctx, target, closure); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); } else { GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -758,115 +585,52 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; - rr_subchannel_list *subchannel_list = - rr_subchannel_list_create(p, addresses->num_addresses); - if (addresses->num_addresses == 0) { + grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( + exec_ctx, &p->base, addresses, args, rr_connectivity_changed_locked); + if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_empty_update"); + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); } p->subchannel_list = subchannel_list; // empty list return; } - size_t subchannel_index = 0; - if (p->latest_pending_subchannel_list != NULL && p->started_picking) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] Shutting down latest pending subchannel list %p, about " - "to be replaced by newer latest %p", - (void *)p, (void *)p->latest_pending_subchannel_list, - (void *)subchannel_list); - } - rr_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); - } - p->latest_pending_subchannel_list = subchannel_list; - grpc_subchannel_args sc_args; - /* We need to remove the LB addresses in order to be able to compare the - * subchannel keys of subchannels from a different batch of addresses. */ - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - /* Create subchannels for addresses in the update. */ - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); - grpc_error *error; - // Get the connectivity state of the subchannel. Already existing ones may - // be in a state other than INIT. - const grpc_connectivity_state subchannel_connectivity_state = - grpc_subchannel_check_connectivity(subchannel, &error); - if (error != GRPC_ERROR_NONE) { - // The subchannel is in error (e.g. shutting down). Ignore it. - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); - GRPC_ERROR_UNREF(error); - continue; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log( - GPR_DEBUG, - "[RR %p] index %lu: Created subchannel %p for address uri %s into " - "subchannel_list %p. Connectivity state %s", - (void *)p, (unsigned long)subchannel_index, (void *)subchannel, - address_uri, (void *)subchannel_list, - grpc_connectivity_state_name(subchannel_connectivity_state)); - gpr_free(address_uri); - } - subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; - sd->subchannel_list = subchannel_list; - sd->subchannel = subchannel; - GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner)); - /* use some sentinel value outside of the range of - * grpc_connectivity_state to signal an undefined previous state. We - * won't be referring to this value again and it'll be overwritten after - * the first call to rr_connectivity_changed_locked */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = subchannel_connectivity_state; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); + if (p->started_picking) { + if (p->latest_pending_subchannel_list != NULL && p->started_picking) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] Shutting down latest pending subchannel list %p, " + "about to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); } - if (p->started_picking) { - rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); - /* 2. Watch every new subchannel. A subchannel list becomes active the + p->latest_pending_subchannel_list = subchannel_list; + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + /* Watch every new subchannel. A subchannel list becomes active the * moment one of its subchannels is READY. At that moment, we swap * p->subchannel_list for sd->subchannel_list, provided the subchannel * list is still valid (ie, isn't shutting down) */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &subchannel_list->subchannels[i]); } - } - if (!p->started_picking) { + } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "rr_update_before_started_picking"); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); } p->subchannel_list = subchannel_list; - p->latest_pending_subchannel_list = NULL; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc new file mode 100644 index 0000000000..ad26316aac --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -0,0 +1,282 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include + +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/transport/connectivity_state.h" + +extern grpc_tracer_flag grpc_lb_round_robin_trace; +extern grpc_tracer_flag grpc_lb_pick_first_trace; + +void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_data *sd, + const char *reason) { + if (sd->subchannel != NULL) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): unreffing subchannel", + sd->subchannel_list->policy, sd->subchannel_list, + sd - sd->subchannel_list->subchannels, + sd->subchannel_list->num_subchannels, sd->subchannel); + } + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); + sd->subchannel = NULL; + if (sd->connected_subchannel != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel, + reason); + sd->connected_subchannel = NULL; + } + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; + } + } +} + +void grpc_lb_subchannel_data_start_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): requesting connectivity change notification", + sd->subchannel_list->policy, sd->subchannel_list, + sd - sd->subchannel_list->subchannels, + sd->subchannel_list->num_subchannels, sd->subchannel); + } + sd->connectivity_notification_pending = true; + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); +} + +void grpc_lb_subchannel_data_stop_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + GPR_ASSERT(sd->connectivity_notification_pending); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): stopping connectivity watch", + sd->subchannel_list->policy, sd->subchannel_list, + sd - sd->subchannel_list->subchannels, + sd->subchannel_list->num_subchannels, sd->subchannel); + } + sd->connectivity_notification_pending = false; +} + +grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, + const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, + grpc_iomgr_cb_func connectivity_changed_cb) { + grpc_lb_subchannel_list *subchannel_list = + (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "[LB %p] Creating subchannel list %p for %" PRIdPTR " subchannels", + p, subchannel_list, addresses->num_addresses); + } + subchannel_list->policy = p; + gpr_ref_init(&subchannel_list->refcount, 1); + subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( + sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); + /* We need to remove the LB addresses in order to be able to compare the + * subchannel keys of subchannels from a different batch of addresses. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + /* Create subchannels for addresses in the update. */ + grpc_subchannel_args sc_args; + size_t subchannel_index = 0; + for (size_t i = 0; i < addresses->num_addresses; i++) { + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; + grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + exec_ctx, args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(exec_ctx, new_args); + grpc_error *error; + // Get the connectivity state of the subchannel. Already existing ones may + // be in a state other than INIT. + const grpc_connectivity_state subchannel_connectivity_state = + grpc_subchannel_check_connectivity(subchannel, &error); + if (error != GRPC_ERROR_NONE) { + // The subchannel is in error (e.g. shutting down). Ignore it. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[LB %p] subchannel for address uri %s shutting down, ignoring", + subchannel_list->policy, address_uri); + gpr_free(address_uri); + } + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); + GRPC_ERROR_UNREF(error); + continue; + } + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log( + GPR_DEBUG, + "[LB %p] subchannel list %p index %" PRIdPTR + ": Created subchannel %p for address uri %s; " + "initial connectivity state: %s", + p, subchannel_list, subchannel_index, subchannel, address_uri, + grpc_connectivity_state_name(subchannel_connectivity_state)); + gpr_free(address_uri); + } + grpc_lb_subchannel_data *sd = + &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, + connectivity_changed_cb, sd, + grpc_combiner_scheduler(args->combiner)); + /* use some sentinel value outside of the range of + * grpc_connectivity_state to signal an undefined previous state. We + * won't be referring to this value again and it'll be overwritten after + * the first call to rr_connectivity_changed_locked */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = subchannel_connectivity_state; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + } + subchannel_list->num_subchannels = subchannel_index; + return subchannel_list; +} + +static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, "[LB %p] Destroying subchannel_list %p", + subchannel_list->policy, subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "subchannel_list_destroy"); + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_DEBUG, "[LB %p] subchannel_list %p REF %lu->%lu (%s)", + subchannel_list->policy, subchannel_list, + (unsigned long)(count - 1), (unsigned long)count, reason); + } +} + +void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_DEBUG, "[LB %p] subchannel_list %p UNREF %lu->%lu (%s)", + subchannel_list->policy, subchannel_list, + (unsigned long)(count + 1), (unsigned long)count, reason); + } + if (done) { + subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +void grpc_lb_subchannel_list_ref_for_connectivity_watch( + grpc_lb_subchannel_list *subchannel_list, const char *reason) { + GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); + grpc_lb_subchannel_list_ref(subchannel_list, reason); +} + +void grpc_lb_subchannel_list_unref_for_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason); + grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +static void grpc_lb_subchannel_data_cancel_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): canceling connectivity watch (%s)", + sd->subchannel_list->policy, sd->subchannel_list, + sd - sd->subchannel_list->subchannels, + sd->subchannel_list->num_subchannels, sd->subchannel, reason); + } + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); +} + +void grpc_lb_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, "[LB %p] Shutting down subchannel_list %p (%s)", + subchannel_list->policy, subchannel_list, reason); + } + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + // If there's a pending notification for this subchannel, cancel it; + // the callback is responsible for unreffing the subchannel. + // Otherwise, unref the subchannel directly. + if (sd->connectivity_notification_pending) { + grpc_lb_subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); + } else if (sd->subchannel != NULL) { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason); + } + } + grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h new file mode 100644 index 0000000000..1e3a921c65 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -0,0 +1,149 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H + +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/transport/connectivity_state.h" + +// TODO(roth): This code is intended to be shared between pick_first and +// round_robin. However, the interface needs more work to provide clean +// encapsulation. For example, the structs here have some fields that are +// only used in one of the two (e.g., the state counters in +// grpc_lb_subchannel_list and the prev_connectivity_state field in +// grpc_lb_subchannel_data are only used in round_robin, and the +// checking_subchannel field in grpc_lb_subchannel_list is only used by +// pick_first). Also, there is probably some code duplication between the +// connectivity state notification callback code in both pick_first and +// round_robin that could be refactored and moved here. In a future PR, +// need to clean this up. + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; + +typedef struct { + /** backpointer to owning subchannel list */ + grpc_lb_subchannel_list *subchannel_list; + /** subchannel itself */ + grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; + /** Is a connectivity notification pending? */ + bool connectivity_notification_pending; + /** notification that connectivity has changed on subchannel */ + grpc_closure connectivity_changed_closure; + /** previous and current connectivity states. Updated by \a + * \a connectivity_changed_closure based on + * \a pending_connectivity_state_unsafe. */ + grpc_connectivity_state prev_connectivity_state; + grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by + * grpc_subchannel_notify_on_state_change(), not guarded by + * the combiner. To be copied to \a curr_connectivity_state by + * \a connectivity_changed_closure. */ + grpc_connectivity_state pending_connectivity_state_unsafe; + /** the subchannel's target user data */ + void *user_data; + /** vtable to operate over \a user_data */ + const grpc_lb_user_data_vtable *user_data_vtable; +} grpc_lb_subchannel_data; + +// Unrefs the subchannel contained in sd. +void grpc_lb_subchannel_data_unref_subchannel( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason); + +// Starts watching the connectivity state of the subchannel. +// The connectivity_changed_cb callback must invoke either +// grpc_lb_subchannel_data_stop_connectivity_watch() or again call +// grpc_lb_subchannel_data_start_connectivity_watch(). +void grpc_lb_subchannel_data_start_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); + +// Stops watching the connectivity state of the subchannel. +void grpc_lb_subchannel_data_stop_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); + +struct grpc_lb_subchannel_list { + /** backpointer to owning policy */ + grpc_lb_policy *policy; + + /** all our subchannels */ + size_t num_subchannels; + grpc_lb_subchannel_data *subchannels; + + /** Index into subchannels of the one we're currently checking. + * Used when connecting to subchannels serially instead of in parallel. */ + // TODO(roth): When we have time, we can probably make this go away + // and the index dynamically by subtracting + // subchannel_list->subchannels from the subchannel_data pointer. + size_t checking_subchannel; + + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ + size_t num_transient_failures; + /** how many subchannels are in state SHUTDOWN */ + size_t num_shutdown; + /** how many subchannels are in state IDLE */ + size_t num_idle; + + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; + + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; +}; + +grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, + const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, + grpc_iomgr_cb_func connectivity_changed_cb); + +void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +// Takes and releases refs needed for a connectivity notification. +// This includes a ref to subchannel_list and a weak ref to the LB policy. +void grpc_lb_subchannel_list_ref_for_connectivity_watch( + grpc_lb_subchannel_list *subchannel_list, const char *reason); +void grpc_lb_subchannel_list_unref_for_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The +// connectivity state notification callback will ultimately unref it. +void grpc_lb_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 7b9fd6424d..68e2753026 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -293,6 +293,7 @@ CORE_SOURCE_FILES = [ 'third_party/nanopb/pb_encode.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', + 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', -- cgit v1.2.3 From efea67438a18bcdfdcc4a96e7dbe98bfced9dab7 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 9 Oct 2017 12:17:52 -0700 Subject: Fix crash in trace logging. --- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index bbf0548405..22b6aea45d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -369,7 +369,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, " of %" PRIdPTR "), subchannel_list %p: state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", (void *)p, (void *)sd->subchannel, - p->subchannel_list->checking_subchannel, + sd->subchannel_list->checking_subchannel, sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), p->shutdown, sd->subchannel_list->shutting_down, -- cgit v1.2.3 From bf6a86a293325b528dc10ee10ed9d1242b87a642 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 9 Oct 2017 12:23:37 -0700 Subject: Fix build portability problem. --- src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index ad26316aac..d99fc48087 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -41,7 +41,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): unreffing subchannel", sd->subchannel_list->policy, sd->subchannel_list, - sd - sd->subchannel_list->subchannels, + (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); @@ -67,7 +67,7 @@ void grpc_lb_subchannel_data_start_connectivity_watch( "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): requesting connectivity change notification", sd->subchannel_list->policy, sd->subchannel_list, - sd - sd->subchannel_list->subchannels, + (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } sd->connectivity_notification_pending = true; @@ -86,7 +86,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): stopping connectivity watch", sd->subchannel_list->policy, sd->subchannel_list, - sd - sd->subchannel_list->subchannels, + (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } sd->connectivity_notification_pending = false; @@ -250,7 +250,7 @@ static void grpc_lb_subchannel_data_cancel_connectivity_watch( "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): canceling connectivity watch (%s)", sd->subchannel_list->policy, sd->subchannel_list, - sd - sd->subchannel_list->subchannels, + (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel, reason); } grpc_subchannel_notify_on_state_change( -- cgit v1.2.3 From 62ca6ced7c1a8b832980ac2637bfff1ee6505e60 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 10 Oct 2017 10:01:51 -0700 Subject: clang-format --- .../lb_policy/pick_first/pick_first.cc | 67 +++++++++++----------- .../lb_policy/round_robin/round_robin.cc | 4 +- .../client_channel/lb_policy/subchannel_list.cc | 16 +++--- .../client_channel/lb_policy/subchannel_list.h | 5 +- 4 files changed, 45 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 22b6aea45d..8119377504 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -84,9 +84,9 @@ static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p, GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "shutdown"); + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "shutdown"); if (p->subchannel_list != NULL) { grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, "pf_shutdown"); @@ -285,8 +285,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. if (p->subchannel_list != NULL) { - grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "pf_update_before_selected"); + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_update_before_selected"); } p->subchannel_list = subchannel_list; } else { @@ -363,17 +363,17 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log( - GPR_DEBUG, - "Pick First %p connectivity changed for subchannel %p (%" PRIdPTR - " of %" PRIdPTR "), subchannel_list %p: state=%s p->shutdown=%d " - "sd->subchannel_list->shutting_down=%d error=%s", - (void *)p, (void *)sd->subchannel, - sd->subchannel_list->checking_subchannel, - sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown, sd->subchannel_list->shutting_down, - grpc_error_string(error)); + gpr_log(GPR_DEBUG, + "Pick First %p connectivity changed for subchannel %p (%" PRIdPTR + " of %" PRIdPTR + "), subchannel_list %p: state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, + sd->subchannel_list->checking_subchannel, + sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); } // If the policy is shutting down, unref and return. if (p->shutdown) { @@ -412,15 +412,14 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; - grpc_lb_subchannel_data *new_sd = &p->subchannel_list->subchannels[ - p->subchannel_list->checking_subchannel]; - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - new_sd->curr_connectivity_state, - GRPC_ERROR_REF(error), - "selected_not_ready+switch_to_update"); + grpc_lb_subchannel_data *new_sd = + &p->subchannel_list + ->subchannels[p->subchannel_list->checking_subchannel]; + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, new_sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state == - GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* if the selected channel goes bad, we're done */ sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; } @@ -471,8 +470,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, "connected"); p->selected = sd; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", - (void *)p, (void *)sd->subchannel); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void *)p, + (void *)sd->subchannel); } // Drop all other subchannels, since we are now connected. destroy_unselected_subchannels_locked(exec_ctx, p); @@ -497,8 +496,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, case GRPC_CHANNEL_TRANSIENT_FAILURE: { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) - % sd->subchannel_list->num_subchannels; + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->subchannel_list->checking_subchannel == 0 && @@ -507,8 +506,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - sd = &sd->subchannel_list->subchannels[ - sd->subchannel_list->checking_subchannel]; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -550,10 +549,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } // Advance to next subchannel and check its state. sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) - % sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list->subchannels[ - sd->subchannel_list->checking_subchannel]; + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b25b9b86d8..1644fcc1bc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -285,8 +285,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, - "rr_picked"); + *target = + GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); if (user_data != NULL) { *user_data = sd->user_data; } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index d99fc48087..8d7e084a2e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -153,13 +153,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log( - GPR_DEBUG, - "[LB %p] subchannel list %p index %" PRIdPTR - ": Created subchannel %p for address uri %s; " - "initial connectivity state: %s", - p, subchannel_list, subchannel_index, subchannel, address_uri, - grpc_connectivity_state_name(subchannel_connectivity_state)); + gpr_log(GPR_DEBUG, "[LB %p] subchannel list %p index %" PRIdPTR + ": Created subchannel %p for address uri %s; " + "initial connectivity state: %s", + p, subchannel_list, subchannel_index, subchannel, address_uri, + grpc_connectivity_state_name(subchannel_connectivity_state)); gpr_free(address_uri); } grpc_lb_subchannel_data *sd = @@ -253,8 +251,8 @@ static void grpc_lb_subchannel_data_cancel_connectivity_watch( (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel, reason); } - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, + &sd->connectivity_changed_closure); } void grpc_lb_subchannel_list_shutdown_and_unref( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 1e3a921c65..cf872afbe2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -68,8 +68,9 @@ typedef struct { } grpc_lb_subchannel_data; // Unrefs the subchannel contained in sd. -void grpc_lb_subchannel_data_unref_subchannel( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason); +void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_data *sd, + const char *reason); // Starts watching the connectivity state of the subchannel. // The connectivity_changed_cb callback must invoke either -- cgit v1.2.3 From 7ba40baeb1821fd312cb7d03c8e9e2383869120b Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 10 Oct 2017 13:17:13 -0700 Subject: Fix crashes and asan bugs. --- .../lb_policy/pick_first/pick_first.cc | 42 ++++++++++++---------- .../client_channel/lb_policy/subchannel_list.cc | 2 +- 2 files changed, 24 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 8119377504..f321fec444 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -397,10 +397,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->subchannel_list == p->latest_pending_subchannel_list); // Update state counters. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++sd->subchannel_list->num_shutdown; - } - sd->prev_connectivity_state = sd->curr_connectivity_state; // Handle updates for the currently selected subchannel. if (p->selected == sd) { // If the new state is anything other than READY and there is a @@ -447,6 +443,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // for a subchannel in p->latest_pending_subchannel_list. The // goal here is to find a subchannel from the update that we can // select in place of the current one. + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + } while (true) { switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_INIT: @@ -494,10 +494,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, return; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->subchannel_list->checking_subchannel == 0 && @@ -506,10 +509,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); @@ -530,11 +532,18 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, return; } case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_candidate_shutdown"); - if (sd->subchannel_list->num_shutdown == - sd->subchannel_list->num_subchannels) { + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data *original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); shutdown_locked(exec_ctx, p, @@ -547,14 +556,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); } - // Advance to next subchannel and check its state. - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 8d7e084a2e..c30416d124 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -79,7 +79,6 @@ void grpc_lb_subchannel_data_start_connectivity_watch( void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { - GPR_ASSERT(sd->connectivity_notification_pending); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, @@ -89,6 +88,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } + GPR_ASSERT(sd->connectivity_notification_pending); sd->connectivity_notification_pending = false; } -- cgit v1.2.3 From 0c11ebaa6c7e2cf0c1f49fc0f094ca9fe98c67e4 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 10 Oct 2017 15:02:43 -0700 Subject: Fix crash when not able to create subchannel. --- .../filters/client_channel/lb_policy/subchannel_list.cc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index c30416d124..df23a32f97 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -129,6 +129,20 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); grpc_channel_args_destroy(exec_ctx, new_args); + if (subchannel == NULL) { + // Subchannel could not be created. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || + GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[LB %p] could not create subchannel for address uri %s, " + "ignoring", + subchannel_list->policy, address_uri); + gpr_free(address_uri); + } + continue; + } grpc_error *error; // Get the connectivity state of the subchannel. Already existing ones may // be in a state other than INIT. -- cgit v1.2.3 From 901bb4f5bce2b924f8d75a710920aeb45221809c Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 11 Oct 2017 08:49:07 -0700 Subject: Improve tracing of subchannel_list code. --- .../lb_policy/pick_first/pick_first.cc | 3 +- .../lb_policy/round_robin/round_robin.cc | 3 +- .../client_channel/lb_policy/subchannel_list.cc | 103 ++++++++++----------- .../client_channel/lb_policy/subchannel_list.h | 5 +- 4 files changed, 57 insertions(+), 57 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index f321fec444..d861746999 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -265,7 +265,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, (void *)p, (unsigned long)addresses->num_addresses); } grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( - exec_ctx, &p->base, addresses, args, pf_connectivity_changed_locked); + exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args, + pf_connectivity_changed_locked); if (subchannel_list->num_subchannels == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 1644fcc1bc..499631f1b1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -586,7 +586,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( - exec_ctx, &p->base, addresses, args, rr_connectivity_changed_locked); + exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, + rr_connectivity_changed_locked); if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index df23a32f97..02c8460b27 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -28,19 +28,16 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -extern grpc_tracer_flag grpc_lb_round_robin_trace; -extern grpc_tracer_flag grpc_lb_pick_first_trace; - void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { if (sd->subchannel != NULL) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, - "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): unreffing subchannel", - sd->subchannel_list->policy, sd->subchannel_list, + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } @@ -61,12 +58,12 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, void grpc_lb_subchannel_data_start_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, - "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): requesting connectivity change notification", - sd->subchannel_list->policy, sd->subchannel_list, + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } @@ -79,12 +76,12 @@ void grpc_lb_subchannel_data_start_connectivity_watch( void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, - "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): stopping connectivity watch", - sd->subchannel_list->policy, sd->subchannel_list, + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } @@ -93,18 +90,18 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( } grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, grpc_iomgr_cb_func connectivity_changed_cb) { grpc_lb_subchannel_list *subchannel_list = (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*tracer)) { gpr_log(GPR_DEBUG, - "[LB %p] Creating subchannel list %p for %" PRIdPTR " subchannels", - p, subchannel_list, addresses->num_addresses); + "[%s %p] Creating subchannel list %p for %" PRIdPTR " subchannels", + tracer->name, p, subchannel_list, addresses->num_addresses); } subchannel_list->policy = p; + subchannel_list->tracer = tracer; gpr_ref_init(&subchannel_list->refcount, 1); subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); @@ -131,14 +128,13 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel == NULL) { // Subchannel could not be created. - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*tracer)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); gpr_log(GPR_DEBUG, - "[LB %p] could not create subchannel for address uri %s, " + "[%s %p] could not create subchannel for address uri %s, " "ignoring", - subchannel_list->policy, address_uri); + tracer->name, subchannel_list->policy, address_uri); gpr_free(address_uri); } continue; @@ -150,27 +146,26 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( grpc_subchannel_check_connectivity(subchannel, &error); if (error != GRPC_ERROR_NONE) { // The subchannel is in error (e.g. shutting down). Ignore it. - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*tracer)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); gpr_log(GPR_DEBUG, - "[LB %p] subchannel for address uri %s shutting down, ignoring", - subchannel_list->policy, address_uri); + "[%s %p] subchannel for address uri %s shutting down, ignoring", + tracer->name, subchannel_list->policy, address_uri); gpr_free(address_uri); } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); GRPC_ERROR_UNREF(error); continue; } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*tracer)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "[LB %p] subchannel list %p index %" PRIdPTR + gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR ": Created subchannel %p for address uri %s; " "initial connectivity state: %s", - p, subchannel_list, subchannel_index, subchannel, address_uri, + tracer->name, p, subchannel_list, subchannel_index, subchannel, + address_uri, grpc_connectivity_state_name(subchannel_connectivity_state)); gpr_free(address_uri); } @@ -199,10 +194,10 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "[LB %p] Destroying subchannel_list %p", - subchannel_list->policy, subchannel_list); + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list); } for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; @@ -216,12 +211,12 @@ static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, const char *reason) { gpr_ref_non_zero(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[LB %p] subchannel_list %p REF %lu->%lu (%s)", - subchannel_list->policy, subchannel_list, - (unsigned long)(count - 1), (unsigned long)count, reason); + gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, (unsigned long)(count - 1), + (unsigned long)count, reason); } } @@ -229,12 +224,12 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, const char *reason) { const bool done = gpr_unref(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[LB %p] subchannel_list %p UNREF %lu->%lu (%s)", - subchannel_list->policy, subchannel_list, - (unsigned long)(count + 1), (unsigned long)count, reason); + gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, (unsigned long)(count + 1), + (unsigned long)count, reason); } if (done) { subchannel_list_destroy(exec_ctx, subchannel_list); @@ -256,12 +251,12 @@ void grpc_lb_subchannel_list_unref_for_connectivity_watch( static void grpc_lb_subchannel_data_cancel_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, - "[LB %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR " (subchannel %p): canceling connectivity watch (%s)", - sd->subchannel_list->policy, sd->subchannel_list, + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel, reason); } @@ -272,10 +267,10 @@ static void grpc_lb_subchannel_data_cancel_connectivity_watch( void grpc_lb_subchannel_list_shutdown_and_unref( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, const char *reason) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || - GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "[LB %p] Shutting down subchannel_list %p (%s)", - subchannel_list->policy, subchannel_list, reason); + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, reason); } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index cf872afbe2..814180388d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -21,6 +21,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and @@ -87,6 +88,8 @@ struct grpc_lb_subchannel_list { /** backpointer to owning policy */ grpc_lb_policy *policy; + grpc_tracer_flag *tracer; + /** all our subchannels */ size_t num_subchannels; grpc_lb_subchannel_data *subchannels; @@ -118,7 +121,7 @@ struct grpc_lb_subchannel_list { }; grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, grpc_iomgr_cb_func connectivity_changed_cb); -- cgit v1.2.3 From 9843ec78a72cced1b304487c8eface166c6fc0a6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 11 Oct 2017 13:00:04 -0700 Subject: clang-format --- .../client_channel/lb_policy/subchannel_list.cc | 47 ++++++++++------------ 1 file changed, 22 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 02c8460b27..db057e295d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -33,13 +33,12 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, const char *reason) { if (sd->subchannel != NULL) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR - " (subchannel %p): unreffing subchannel", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, - (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR + " of %" PRIdPTR " (subchannel %p): unreffing subchannel", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); } GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); sd->subchannel = NULL; @@ -77,13 +76,12 @@ void grpc_lb_subchannel_data_start_connectivity_watch( void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR - " (subchannel %p): stopping connectivity watch", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, - (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): stopping connectivity watch", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); } GPR_ASSERT(sd->connectivity_notification_pending); sd->connectivity_notification_pending = false; @@ -215,8 +213,8 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", subchannel_list->tracer->name, subchannel_list->policy, - subchannel_list, (unsigned long)(count - 1), - (unsigned long)count, reason); + subchannel_list, (unsigned long)(count - 1), (unsigned long)count, + reason); } } @@ -228,8 +226,8 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", subchannel_list->tracer->name, subchannel_list->policy, - subchannel_list, (unsigned long)(count + 1), - (unsigned long)count, reason); + subchannel_list, (unsigned long)(count + 1), (unsigned long)count, + reason); } if (done) { subchannel_list_destroy(exec_ctx, subchannel_list); @@ -252,13 +250,12 @@ void grpc_lb_subchannel_list_unref_for_connectivity_watch( static void grpc_lb_subchannel_data_cancel_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR - " (subchannel %p): canceling connectivity watch (%s)", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, - (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, reason); + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + " (subchannel %p): canceling connectivity watch (%s)", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel, reason); } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); -- cgit v1.2.3 From 5132d0e6092d34840e583dac0ef0dd01f4226a63 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 13 Oct 2017 13:20:52 -0700 Subject: Code review changes. --- .../client_channel/lb_policy/pick_first/pick_first.cc | 8 ++++---- .../client_channel/lb_policy/round_robin/round_robin.cc | 7 +++---- .../ext/filters/client_channel/lb_policy/subchannel_list.cc | 12 +++++------- 3 files changed, 12 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d861746999..025ee29081 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -44,9 +44,9 @@ typedef struct { grpc_lb_policy base; /** all our subchannels */ grpc_lb_subchannel_list *subchannel_list; - /** Latest pending subchannel list. */ + /** latest pending subchannel list */ grpc_lb_subchannel_list *latest_pending_subchannel_list; - /** Selected subchannel in \a subchannel_list. */ + /** selected subchannel in \a subchannel_list */ grpc_lb_subchannel_data *selected; /** have we started picking? */ bool started_picking; @@ -351,7 +351,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } // If we've started picking, start trying to connect to the first // subchannel in the new list. - if (p->started_picking && subchannel_list->num_subchannels > 0) { + if (p->started_picking) { grpc_lb_subchannel_list_ref_for_connectivity_watch( subchannel_list, "connectivity_watch+update"); grpc_lb_subchannel_data_start_connectivity_watch( @@ -396,7 +396,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list || sd->subchannel_list == p->latest_pending_subchannel_list); - // Update state counters. + // Update state. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Handle updates for the currently selected subchannel. if (p->selected == sd) { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 499631f1b1..67361bfe5d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -436,7 +436,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "rr_shutdown"); + exec_ctx, sd->subchannel_list, "rr_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in @@ -601,7 +601,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } if (p->started_picking) { - if (p->latest_pending_subchannel_list != NULL && p->started_picking) { + if (p->latest_pending_subchannel_list != NULL) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " @@ -610,8 +610,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, (void *)subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, - "sl_outdated_dont_smash"); + exec_ctx, p->latest_pending_subchannel_list, "sl_outdated"); } p->latest_pending_subchannel_list = subchannel_list; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index db057e295d..9a7ccedac1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -103,11 +103,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( gpr_ref_init(&subchannel_list->refcount, 1); subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); - /* We need to remove the LB addresses in order to be able to compare the - * subchannel keys of subchannels from a different batch of addresses. */ + // We need to remove the LB addresses in order to be able to compare the + // subchannel keys of subchannels from a different batch of addresses. static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, GRPC_ARG_LB_ADDRESSES}; - /* Create subchannels for addresses in the update. */ + // Create a subchannels for each address. grpc_subchannel_args sc_args; size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { @@ -174,10 +174,8 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, connectivity_changed_cb, sd, grpc_combiner_scheduler(args->combiner)); - /* use some sentinel value outside of the range of - * grpc_connectivity_state to signal an undefined previous state. We - * won't be referring to this value again and it'll be overwritten after - * the first call to rr_connectivity_changed_locked */ + // Use some sentinel value outside of the range of + // grpc_connectivity_state to signal an undefined previous state. sd->prev_connectivity_state = GRPC_CHANNEL_INIT; sd->curr_connectivity_state = subchannel_connectivity_state; sd->user_data_vtable = addresses->user_data_vtable; -- cgit v1.2.3 From 99f54e1572d27a7e7b80918c0cf3cd9614751886 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 16 Oct 2017 09:55:53 -0700 Subject: Code review changes. --- .../ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 5 +---- src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 025ee29081..970751ce2c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -409,11 +409,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; - grpc_lb_subchannel_data *new_sd = - &p->subchannel_list - ->subchannels[p->subchannel_list->checking_subchannel]; grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, new_sd->curr_connectivity_state, + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 9a7ccedac1..1d2eb395f0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -107,7 +107,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( // subchannel keys of subchannels from a different batch of addresses. static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, GRPC_ARG_LB_ADDRESSES}; - // Create a subchannels for each address. + // Create a subchannel for each address. grpc_subchannel_args sc_args; size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { @@ -245,7 +245,7 @@ void grpc_lb_subchannel_list_unref_for_connectivity_watch( grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); } -static void grpc_lb_subchannel_data_cancel_connectivity_watch( +static void subchannel_data_cancel_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log( @@ -275,7 +275,7 @@ void grpc_lb_subchannel_list_shutdown_and_unref( // the callback is responsible for unreffing the subchannel. // Otherwise, unref the subchannel directly. if (sd->connectivity_notification_pending) { - grpc_lb_subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); + subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); } else if (sd->subchannel != NULL) { grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason); } -- cgit v1.2.3 From aadf9f4b8459702e6976b8fc8c3bffa0f321a082 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 19 Oct 2017 07:53:53 -0700 Subject: Code review changes. --- .../lb_policy/pick_first/pick_first.cc | 2 +- .../lb_policy/round_robin/round_robin.cc | 5 +---- .../client_channel/lb_policy/subchannel_list.h | 20 ++++++++++---------- 3 files changed, 12 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 970751ce2c..b19616a48e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -427,7 +427,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); - pf_shutdown_locked(exec_ctx, &p->base); + shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); } } return; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 67361bfe5d..279f80f8a3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -187,15 +187,12 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - const bool latest_is_current = - p->subchannel_list == p->latest_pending_subchannel_list; if (p->subchannel_list != NULL) { grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, "sl_shutdown_rr_shutdown"); p->subchannel_list = NULL; } - if (!latest_is_current && p->latest_pending_subchannel_list != NULL && - !p->latest_pending_subchannel_list->shutting_down) { + if (p->latest_pending_subchannel_list != NULL) { grpc_lb_subchannel_list_shutdown_and_unref( exec_ctx, p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 814180388d..7ad22302ce 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -68,19 +68,19 @@ typedef struct { const grpc_lb_user_data_vtable *user_data_vtable; } grpc_lb_subchannel_data; -// Unrefs the subchannel contained in sd. +/// Unrefs the subchannel contained in sd. void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason); -// Starts watching the connectivity state of the subchannel. -// The connectivity_changed_cb callback must invoke either -// grpc_lb_subchannel_data_stop_connectivity_watch() or again call -// grpc_lb_subchannel_data_start_connectivity_watch(). +/// Starts watching the connectivity state of the subchannel. +/// The connectivity_changed_cb callback must invoke either +/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call +/// grpc_lb_subchannel_data_start_connectivity_watch(). void grpc_lb_subchannel_data_start_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); -// Stops watching the connectivity state of the subchannel. +/// Stops watching the connectivity state of the subchannel. void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); @@ -132,16 +132,16 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, const char *reason); -// Takes and releases refs needed for a connectivity notification. -// This includes a ref to subchannel_list and a weak ref to the LB policy. +/// Takes and releases refs needed for a connectivity notification. +/// This includes a ref to subchannel_list and a weak ref to the LB policy. void grpc_lb_subchannel_list_ref_for_connectivity_watch( grpc_lb_subchannel_list *subchannel_list, const char *reason); void grpc_lb_subchannel_list_unref_for_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, const char *reason); -// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The -// connectivity state notification callback will ultimately unref it. +/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The +/// connectivity state notification callback will ultimately unref it. void grpc_lb_subchannel_list_shutdown_and_unref( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, const char *reason); -- cgit v1.2.3 From 57cdb166590920c72604fe3597d8393d8cc26bb6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 23 Oct 2017 12:37:07 -0700 Subject: Initialize subchannel_list->pending_connectivity_state_unsafe. --- src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 1d2eb395f0..af7a6acdff 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -178,6 +178,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( // grpc_connectivity_state to signal an undefined previous state. sd->prev_connectivity_state = GRPC_CHANNEL_INIT; sd->curr_connectivity_state = subchannel_connectivity_state; + sd->pending_connectivity_state_unsafe = subchannel_connectivity_state; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = -- cgit v1.2.3 From 61da0506931715dbc36ac1964bfc310d1738ecb6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 25 Oct 2017 07:59:39 -0700 Subject: Fix bug from merge and improve logging. --- .../client_channel/lb_policy/round_robin/round_robin.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 1defcb7959..7d49c3ffe1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -463,7 +463,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - shutdown_locked(exec_ctx, &p->base, GRPC_ERROR_REF(error)); + shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); } } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { @@ -572,21 +572,22 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); + // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. + // Otherwise, keep using the current subchannel list (ignore this update). if (p->subchannel_list == NULL) { - // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); - } else { - // otherwise, keep using the current subchannel list (ignore this update). - gpr_log(GPR_ERROR, - "[RR %p] No valid LB addresses channel arg for update, ignoring.", - (void *)p); } return; } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIdPTR " addresses", + p, addresses->num_addresses); + } grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, rr_connectivity_changed_locked); -- cgit v1.2.3 From 6c5569167daf51503145ce791d0c5f6b99b016fd Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 25 Oct 2017 07:59:51 -0700 Subject: Take ref to connected subchannel if subchannel starts in state READY. --- .../ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 5 ----- src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 5 +++++ 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b19616a48e..81f9bc5a98 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -312,11 +312,6 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, exec_ctx, p->subchannel_list, "pf_update_includes_selected"); } p->subchannel_list = subchannel_list; - if (p->selected->connected_subchannel != NULL) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "pf_update_includes_selected"); - } p->selected = sd; destroy_unselected_subchannels_locked(exec_ctx, p); // If there was a previously pending update (which may or may diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index af7a6acdff..cf3e9525ef 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -179,6 +179,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( sd->prev_connectivity_state = GRPC_CHANNEL_INIT; sd->curr_connectivity_state = subchannel_connectivity_state; sd->pending_connectivity_state_unsafe = subchannel_connectivity_state; + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "ready_at_sl_creation"); + } sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = -- cgit v1.2.3 From cfcbab3d84b48b005ebd038feb02004344429b28 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 26 Oct 2017 09:48:30 -0700 Subject: clang-format --- .../client_channel/lb_policy/round_robin/round_robin.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 7d49c3ffe1..e3cfd1baa1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -182,9 +182,9 @@ static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p, GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "rr_shutdown"); + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "rr_shutdown"); if (p->subchannel_list != NULL) { grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, "sl_shutdown_rr_shutdown"); @@ -585,8 +585,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIdPTR " addresses", - p, addresses->num_addresses); + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIdPTR " addresses", p, + addresses->num_addresses); } grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, -- cgit v1.2.3 From e9b1083791873f2bc9e0bc33874911c49f05e923 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 26 Oct 2017 13:18:25 -0700 Subject: Change uses of PRIdPTR to PRIuPTR. --- src/core/ext/filters/client_channel/client_channel.cc | 4 ++-- .../client_channel/lb_policy/pick_first/pick_first.cc | 6 +++--- .../client_channel/lb_policy/round_robin/round_robin.cc | 2 +- .../filters/client_channel/lb_policy/subchannel_list.cc | 14 +++++++------- 4 files changed, 13 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ea5e076c3b..34ba504949 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -898,7 +898,7 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, call_data *calld = (call_data *)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, - "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, calld->waiting_for_pick_batches_count, grpc_error_string(error)); } @@ -940,7 +940,7 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, channel_data *chand = (channel_data *)elem->channel_data; call_data *calld = (call_data *)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR " pending batches to subchannel_call=%p", chand, calld, calld->waiting_for_pick_batches_count, calld->subchannel_call); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 81f9bc5a98..9b5b9217af 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -300,7 +300,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " - "at update index %" PRIdPTR " of %" PRIdPTR "; update done", + "at update index %" PRIuPTR " of %" PRIuPTR "; update done", p, p->selected->subchannel, i, subchannel_list->num_subchannels); } @@ -360,8 +360,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, - "Pick First %p connectivity changed for subchannel %p (%" PRIdPTR - " of %" PRIdPTR + "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR + " of %" PRIuPTR "), subchannel_list %p: state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", (void *)p, (void *)sd->subchannel, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e3cfd1baa1..488fd3c0ac 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -585,7 +585,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIdPTR " addresses", p, + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, addresses->num_addresses); } grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index cf3e9525ef..831d8ac255 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -34,8 +34,8 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, if (sd->subchannel != NULL) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR - " of %" PRIdPTR " (subchannel %p): unreffing subchannel", + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR + " of %" PRIuPTR " (subchannel %p): unreffing subchannel", sd->subchannel_list->tracer->name, sd->subchannel_list->policy, sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); @@ -59,7 +59,7 @@ void grpc_lb_subchannel_data_start_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): requesting connectivity change notification", sd->subchannel_list->tracer->name, sd->subchannel_list->policy, sd->subchannel_list, @@ -77,7 +77,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): stopping connectivity watch", sd->subchannel_list->tracer->name, sd->subchannel_list->policy, sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), @@ -95,7 +95,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); if (GRPC_TRACER_ON(*tracer)) { gpr_log(GPR_DEBUG, - "[%s %p] Creating subchannel list %p for %" PRIdPTR " subchannels", + "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", tracer->name, p, subchannel_list, addresses->num_addresses); } subchannel_list->policy = p; @@ -159,7 +159,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( if (GRPC_TRACER_ON(*tracer)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR + gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR ": Created subchannel %p for address uri %s; " "initial connectivity state: %s", tracer->name, p, subchannel_list, subchannel_index, subchannel, @@ -255,7 +255,7 @@ static void subchannel_data_cancel_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): canceling connectivity watch (%s)", sd->subchannel_list->tracer->name, sd->subchannel_list->policy, sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), -- cgit v1.2.3 From 6e5ce7288da316a6ceee84bd071b6bbddec21495 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 27 Oct 2017 09:37:41 -0700 Subject: Assume that subchannels start in state IDLE. --- include/grpc/impl/codegen/connectivity_state.h | 2 -- .../client_channel/lb_policy/grpclb/grpclb.cc | 2 -- .../lb_policy/pick_first/pick_first.cc | 6 ++-- .../lb_policy/round_robin/round_robin.cc | 2 +- .../client_channel/lb_policy/subchannel_list.cc | 41 +++++----------------- .../client_channel/lb_policy/subchannel_list.h | 2 +- src/core/ext/filters/client_channel/subchannel.h | 4 +-- src/core/lib/transport/connectivity_state.cc | 3 -- test/cpp/end2end/client_lb_end2end_test.cc | 4 +-- 9 files changed, 18 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/include/grpc/impl/codegen/connectivity_state.h b/include/grpc/impl/codegen/connectivity_state.h index 545b4fdbcc..b70dbef356 100644 --- a/include/grpc/impl/codegen/connectivity_state.h +++ b/include/grpc/impl/codegen/connectivity_state.h @@ -25,8 +25,6 @@ extern "C" { /** Connectivity state of a channel. */ typedef enum { - /** channel has just been initialized */ - GRPC_CHANNEL_INIT = -1, /** channel is idle */ GRPC_CHANNEL_IDLE, /** channel is connecting */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d6bdc13ba9..85e76e68b5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -611,7 +611,6 @@ static void update_lb_connectivity_status_locked( case GRPC_CHANNEL_SHUTDOWN: GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); break; - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: @@ -1790,7 +1789,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // embedded RR policy. Note that the current RR policy, if any, will stay in // effect until an update from the new lb_call is received. switch (glb_policy->lb_channel_connectivity) { - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { /* resub. */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9b5b9217af..f0c66c68e1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -312,6 +312,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, exec_ctx, p->subchannel_list, "pf_update_includes_selected"); } p->subchannel_list = subchannel_list; + if (p->selected->connected_subchannel != NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "pf_update_includes_selected"); + } p->selected = sd; destroy_unselected_subchannels_locked(exec_ctx, p); // If there was a previously pending update (which may or may @@ -442,8 +446,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } while (true) { switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 488fd3c0ac..8f29c80130 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -334,6 +334,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data *sd) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; } + sd->prev_connectivity_state = sd->curr_connectivity_state; if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -451,7 +452,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Update state counters and determine new overall state. update_state_counters_locked(sd); - sd->prev_connectivity_state = sd->curr_connectivity_state; const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 831d8ac255..08ea4f480b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -137,34 +137,13 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( } continue; } - grpc_error *error; - // Get the connectivity state of the subchannel. Already existing ones may - // be in a state other than INIT. - const grpc_connectivity_state subchannel_connectivity_state = - grpc_subchannel_check_connectivity(subchannel, &error); - if (error != GRPC_ERROR_NONE) { - // The subchannel is in error (e.g. shutting down). Ignore it. - if (GRPC_TRACER_ON(*tracer)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "[%s %p] subchannel for address uri %s shutting down, ignoring", - tracer->name, subchannel_list->policy, address_uri); - gpr_free(address_uri); - } - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); - GRPC_ERROR_UNREF(error); - continue; - } if (GRPC_TRACER_ON(*tracer)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR - ": Created subchannel %p for address uri %s; " - "initial connectivity state: %s", + ": Created subchannel %p for address uri %s", tracer->name, p, subchannel_list, subchannel_index, subchannel, - address_uri, - grpc_connectivity_state_name(subchannel_connectivity_state)); + address_uri); gpr_free(address_uri); } grpc_lb_subchannel_data *sd = @@ -174,16 +153,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, connectivity_changed_cb, sd, grpc_combiner_scheduler(args->combiner)); - // Use some sentinel value outside of the range of - // grpc_connectivity_state to signal an undefined previous state. - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = subchannel_connectivity_state; - sd->pending_connectivity_state_unsafe = subchannel_connectivity_state; - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "ready_at_sl_creation"); - } + // We assume that the current state is IDLE. If not, we'll get a + // callback telling us that. + sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = @@ -191,6 +165,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( } } subchannel_list->num_subchannels = subchannel_index; + subchannel_list->num_idle = subchannel_index; return subchannel_list; } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 7ad22302ce..9d5984260f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -97,7 +97,7 @@ struct grpc_lb_subchannel_list { /** Index into subchannels of the one we're currently checking. * Used when connecting to subchannels serially instead of in parallel. */ // TODO(roth): When we have time, we can probably make this go away - // and the index dynamically by subtracting + // and compute the index dynamically by subtracting // subchannel_list->subchannels from the subchannel_data pointer. size_t checking_subchannel; diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 46b29f1fe0..1cd73f3ff4 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -127,8 +127,8 @@ void grpc_connected_subchannel_process_transport_op( grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel *channel, grpc_error **error); -/** call notify when the connectivity state of a channel changes from *state. - Updates *state with the new state of the channel */ +/** Calls notify when the connectivity state of a channel becomes different + from *state. Updates *state with the new state of the channel. */ void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index f328a6cdbb..652c26cf0a 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -29,8 +29,6 @@ grpc_tracer_flag grpc_connectivity_state_trace = const char *grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { - case GRPC_CHANNEL_INIT: - return "INIT"; case GRPC_CHANNEL_IDLE: return "IDLE"; case GRPC_CHANNEL_CONNECTING: @@ -174,7 +172,6 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c236f76e89..805e5b1045 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -305,7 +305,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.clear(); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET none *******"); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); @@ -481,7 +481,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // An empty update will result in the channel going into TRANSIENT_FAILURE. ports.clear(); SetNextResolution(ports); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); -- cgit v1.2.3