diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c | 673 |
1 files changed, 411 insertions, 262 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 7ee6ffb787..3c8520cc1c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -1,63 +1,28 @@ /* * - * Copyright 2015, Google Inc. - * All rights reserved. + * Copyright 2015 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ /** Round Robin Policy. * - * This policy keeps: - * - A circular list of ready (connected) subchannels, the *readylist*. An empty - * readylist consists solely of its root (dummy) node. - * - A pointer to the last element picked from the readylist, the *lastpick*. - * Initially set to point to the readylist's root. - * - * Behavior: - * - When a subchannel connects, it's *prepended* to the readylist's root node. - * Ie, if readylist = A <-> B <-> ROOT <-> C - * ^ ^ - * |____________________| - * and subchannel D becomes connected, the addition of D to the readylist - * results in readylist = A <-> B <-> D <-> ROOT <-> C - * ^ ^ - * |__________________________| - * - When a subchannel disconnects, it's removed from the readylist. If the - * subchannel being removed was the most recently picked, the *lastpick* - * pointer moves to the removed node's previous element. Note that if the - * readylist only had one element, this is still legal, as the lastpick would - * point to the dummy root node, for an empty readylist. - * - Upon picking, *lastpick* is updated to point to the returned (connected) - * subchannel. Note that it's possible that the selected subchannel becomes - * disconnected in the interim between the selection and the actual usage of - * the subchannel by the caller. - */ + * Before every pick, the \a get_next_ready_subchannel_index_locked function + * returns the p->subchannel_list->subchannels index for next subchannel, + * respecting the relative + * order of the addresses provided upon creation or updates. Note however that + * updates will start picking from the beginning of the updated list. */ #include <string.h> @@ -72,8 +37,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct round_robin_lb_policy round_robin_lb_policy; - grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); /** List of entities waiting for a pick. @@ -99,9 +62,37 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; +typedef struct rr_subchannel_list rr_subchannel_list; +typedef struct round_robin_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + rr_subchannel_list *subchannel_list; + + /** have we started picking? */ + bool started_picking; + /** are we shutting down? */ + bool shutdown; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** Index into subchannels for last pick. */ + size_t last_ready_subchannel_index; + + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + rr_subchannel_list *latest_pending_subchannel_list; +} round_robin_lb_policy; + typedef struct { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; + /** backpointer to owning subchannel list */ + rr_subchannel_list *subchannel_list; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ @@ -123,12 +114,9 @@ typedef struct { const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; -struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - /** total number of addresses received at creation time */ - size_t num_addresses; +struct rr_subchannel_list { + /** backpointer to owning policy */ + round_robin_lb_policy *policy; /** all our subchannels */ size_t num_subchannels; @@ -141,67 +129,143 @@ struct round_robin_lb_policy { /** how many subchannels are in state IDLE */ size_t num_idle; - /** have we started picking? */ - bool started_picking; - /** are we shutting down? */ - bool shutdown; - /** List of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; - // Index into subchannels for last pick. - size_t last_ready_subchannel_index; + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; }; -/** Returns the index into p->subchannels of the next subchannel in - * READY state, or p->num_subchannels if no subchannel is READY. +static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list) { + GPR_ASSERT(subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", + (void *)subchannel_list->policy, (void *)subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, + "rr_subchannel_list_destroy"); + } + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count - 1), (unsigned long)count); + } +} + +static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count + 1), (unsigned long)count); + } + if (done) { + rr_subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The + * watcher's callback will ultimately unref \a subchannel_list. */ +static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } + } + rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ static size_t get_next_ready_subchannel_index_locked( const round_robin_lb_policy *p) { + GPR_ASSERT(p->subchannel_list != NULL); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR: %p] getting next ready subchannel, " + "[RR %p] getting next ready subchannel (out of %lu), " "last_ready_subchannel_index=%lu", - p, (unsigned long)p->last_ready_subchannel_index); + (void *)p, (unsigned long)p->subchannel_list->num_subchannels, + (unsigned long)p->last_ready_subchannel_index); } - for (size_t i = 0; i < p->num_subchannels; ++i) { - const size_t index = - (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + const size_t index = (i + p->last_ready_subchannel_index + 1) % + p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, - (unsigned long)index, - p->subchannels[index].curr_connectivity_state); + gpr_log(GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%d", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + p->subchannel_list->subchannels[index].curr_connectivity_state); } - if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (p->subchannel_list->subchannels[index].curr_connectivity_state == + GRPC_CHANNEL_READY) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", - p, (unsigned long)index); + gpr_log(GPR_DEBUG, + "[RR %p] found next ready subchannel (%p) at index %lu of " + "subchannel_list %p", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (unsigned long)index, (void *)p->subchannel_list); } return index; } } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p); } - return p->num_subchannels; + return p->subchannel_list->num_subchannels; } // Sets p->last_ready_subchannel_index to last_ready_index. static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->num_subchannels); + GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void *)p, (unsigned long)last_ready_index, - (void *)p->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannels[last_ready_index].subchannel)); + gpr_log( + GPR_DEBUG, + "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannel_list->subchannels[last_ready_index].subchannel)); } } @@ -210,18 +274,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - } - } - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); gpr_free(p); } @@ -235,7 +288,7 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); gpr_free(pp); @@ -243,14 +296,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -263,7 +311,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -288,7 +336,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -304,15 +352,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); - } + for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &p->subchannel_list->subchannels[i]; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } @@ -332,63 +379,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - /* readily available, report right away */ - subchannel_data *sd = &p->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); - if (user_data != NULL) { - *user_data = sd->user_data; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", - (void *)*target, (unsigned long)next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; - } else { - /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + if (p->subchannel_list != NULL) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->subchannel_list->num_subchannels) { + /* readily available, report right away */ + subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; + *target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "rr_picked"); + if (user_data != NULL) { + *user_data = sd->user_data; + } + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " + "INDEX %lu)", + (void *)p, (void *)sd->subchannel, (void *)*target, + (void *)sd->subchannel_list, (unsigned long)next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + update_last_ready_subchannel_index_locked(p, next_ready_index); + return 1; } - pending_pick *pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; - return 0; } + /* no pick currently available. Save for later in list of pending picks */ + if (!p->started_picking) { + start_picking_locked(exec_ctx, p); + } + pending_pick *pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->on_complete = on_complete; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; + p->pending_picks = pp; + return 0; } static void update_state_counters_locked(subchannel_data *sd) { - round_robin_lb_policy *p = sd->policy; + rr_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(p->num_ready > 0); - --p->num_ready; + GPR_ASSERT(subchannel_list->num_ready > 0); + --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(p->num_transient_failures > 0); - --p->num_transient_failures; + GPR_ASSERT(subchannel_list->num_transient_failures > 0); + --subchannel_list->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(p->num_idle > 0); - --p->num_idle; + GPR_ASSERT(subchannel_list->num_idle > 0); + --subchannel_list->num_idle; } if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++p->num_ready; + ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++p->num_transient_failures; + ++subchannel_list->num_transient_failures; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++p->num_idle; + ++subchannel_list->num_idle; } } -/* sd is the subchannel_data associted with the updated subchannel. - * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE - * or SHUTDOWN */ +/** Sets the policy's connectivity status based on that of the passed-in \a sd + * (the subchannel_data associted with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be + * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the + * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we @@ -401,17 +455,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->num_subchannels = 0. + * CHECK: p->subchannel_list->num_subchannels = 0. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->num_subchannels. + * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->num_subchannels. + * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ - round_robin_lb_policy *p = sd->policy; - if (p->num_ready > 0) { /* 1) READY */ + rr_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = subchannel_list->policy; + if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -421,18 +476,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); return GRPC_CHANNEL_CONNECTING; - } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); return GRPC_CHANNEL_SHUTDOWN; - } else if (p->num_transient_failures == - p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + } else if (subchannel_list->num_transient_failures == + p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); return GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == + p->subchannel_list->num_subchannels) { /* 5) IDLE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); return GRPC_CHANNEL_IDLE; @@ -444,7 +500,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; - round_robin_lb_policy *p = sd->policy; + round_robin_lb_policy *p = sd->subchannel_list->policy; + // If the policy is shutting down, unref and return. + if (p->shutdown) { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); + return; + } + if (sd->subchannel_list->shutting_down) { + // the subchannel list associated with sd has been discarded. This callback + // corresponds to the unsubscription. + GPR_ASSERT(error == GRPC_ERROR_CANCELLED); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + return; + } + // Dispose of outdated subchannel lists. + if (sd->subchannel_list != p->subchannel_list && + sd->subchannel_list != p->latest_pending_subchannel_list) { + // sd belongs to an outdated subchannel_list: get rid of it. + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + return; + } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -453,21 +530,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p: " "prev_state=%d new_state=%d", - p, sd->subchannel, sd->prev_connectivity_state, + (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, sd->curr_connectivity_state); } - // If we're shutting down, unref and return. - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - return; - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; - grpc_connectivity_state new_connectivity_state = + const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the new state is SHUTDOWN, unref the subchannel, and if the new - // overall state is SHUTDOWN, clean up. + // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new + // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); sd->subchannel = NULL; @@ -475,26 +547,53 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); } - if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - } else { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "rr_connectivity_sd_shutdown"); + } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->subchannel_list != p->subchannel_list) { + // promote sd->subchannel_list to p->subchannel_list. + // sd->subchannel_list must be equal to + // p->latest_pending_subchannel_list because we have already filtered + // for sds belonging to outdated subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(!sd->subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] phasing out subchannel list %p (size %lu) in favor " + "of %p (size %lu)", + (void *)p, (void *)p->subchannel_list, + (unsigned long)p->subchannel_list->num_subchannels, + (void *)sd->subchannel_list, + (unsigned long)sd->subchannel_list->num_subchannels); + } + if (p->subchannel_list != NULL) { + // dispose of the current subchannel_list + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update_connectivity"); + } + p->subchannel_list = sd->subchannel_list; + p->latest_pending_subchannel_list = NULL; + } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->num_subchannels); - subchannel_data *selected = &p->subchannels[next_ready_index]; + GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ @@ -515,11 +614,12 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, (void *)selected->subchannel, (unsigned long)next_ready_index); } - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref */ + /* renew notification: reuses the "rr_connectivity" weak ref on the policy + * as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -546,65 +646,82 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - subchannel_data *selected = &p->subchannels[next_ready_index]; + if (next_ready_index < p->subchannel_list->num_subchannels) { + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ +static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + round_robin_lb_policy *p = (round_robin_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer addresses, since + * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; + if (p->subchannel_list == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "rr_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Round Robin %p update, " + "ignoring.", + (void *)p); + } + return; } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } - if (num_addrs == 0) return NULL; - - round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->num_addresses = num_addrs; - p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); - - grpc_subchannel_args sc_args; + if (num_addrs == 0) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update"); + p->subchannel_list = NULL; + } + return; + } size_t subchannel_index = 0; + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_addrs); + subchannel_list->num_subchannels = num_addrs; + gpr_ref_init(&subchannel_list->refcount, 1); + p->latest_pending_subchannel_list = subchannel_list; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", + (void *)subchannel_list, (unsigned long)num_addrs); + } + grpc_subchannel_args sc_args; + /* We need to remove the LB addresses in order to be able to compare the + * subchannel keys of subchannels from a different batch of addresses. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + GPR_ASSERT(i < num_addrs); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); @@ -618,52 +735,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", - (unsigned long)subchannel_index, (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, + "index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (unsigned long)subchannel_index, (void *)subchannel, address_uri, + (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); - if (subchannel != NULL) { - subchannel_data *sd = &p->subchannels[subchannel_index]; - sd->policy = p; - sd->subchannel = subchannel; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); - ++subchannel_index; + subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner)); + /* use some sentinel value outside of the range of + * grpc_connectivity_state to signal an undefined previous state. We + * won't be referring to this value again and it'll be overwritten after + * the first call to rr_connectivity_changed_locked */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + if (p->started_picking) { + rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); + /* 2. Watch every new subchannel. A subchannel list becomes active the + * moment one of its subchannels is READY. At that moment, we swap + * p->subchannel_list for sd->subchannel_list, provided the subchannel + * list is still valid (ie, isn't shutting down) */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } - if (subchannel_index == 0) { - /* couldn't create any subchannel. Bail out */ - gpr_free(p->subchannels); - gpr_free(p); - return NULL; + if (!p->started_picking) { + // The policy isn't picking yet. Save the update for later, disposing of + // previous version if any. + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); + } + p->subchannel_list = subchannel_list; } - p->num_subchannels = subchannel_index; +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked, + rr_update_locked}; + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - // Initialize the last pick index to the last subchannel, so that the - // first pick will start at the beginning of the list. - p->last_ready_subchannel_index = subchannel_index - 1; +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + GPR_ASSERT(args->client_channel_factory != NULL); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); + rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", - (void *)p, (unsigned long)p->num_subchannels); + gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; } |