aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c673
1 files changed, 411 insertions, 262 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 7ee6ffb787..3c8520cc1c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -1,63 +1,28 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
/** Round Robin Policy.
*
- * This policy keeps:
- * - A circular list of ready (connected) subchannels, the *readylist*. An empty
- * readylist consists solely of its root (dummy) node.
- * - A pointer to the last element picked from the readylist, the *lastpick*.
- * Initially set to point to the readylist's root.
- *
- * Behavior:
- * - When a subchannel connects, it's *prepended* to the readylist's root node.
- * Ie, if readylist = A <-> B <-> ROOT <-> C
- * ^ ^
- * |____________________|
- * and subchannel D becomes connected, the addition of D to the readylist
- * results in readylist = A <-> B <-> D <-> ROOT <-> C
- * ^ ^
- * |__________________________|
- * - When a subchannel disconnects, it's removed from the readylist. If the
- * subchannel being removed was the most recently picked, the *lastpick*
- * pointer moves to the removed node's previous element. Note that if the
- * readylist only had one element, this is still legal, as the lastpick would
- * point to the dummy root node, for an empty readylist.
- * - Upon picking, *lastpick* is updated to point to the returned (connected)
- * subchannel. Note that it's possible that the selected subchannel becomes
- * disconnected in the interim between the selection and the actual usage of
- * the subchannel by the caller.
- */
+ * Before every pick, the \a get_next_ready_subchannel_index_locked function
+ * returns the p->subchannel_list->subchannels index for next subchannel,
+ * respecting the relative
+ * order of the addresses provided upon creation or updates. Note however that
+ * updates will start picking from the beginning of the updated list. */
#include <string.h>
@@ -72,8 +37,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct round_robin_lb_policy round_robin_lb_policy;
-
grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false);
/** List of entities waiting for a pick.
@@ -99,9 +62,37 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
+typedef struct rr_subchannel_list rr_subchannel_list;
+typedef struct round_robin_lb_policy {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+
+ rr_subchannel_list *subchannel_list;
+
+ /** have we started picking? */
+ bool started_picking;
+ /** are we shutting down? */
+ bool shutdown;
+ /** List of picks that are waiting on connectivity */
+ pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** Index into subchannels for last pick. */
+ size_t last_ready_subchannel_index;
+
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ rr_subchannel_list *latest_pending_subchannel_list;
+} round_robin_lb_policy;
+
typedef struct {
- /** backpointer to owning policy */
- round_robin_lb_policy *policy;
+ /** backpointer to owning subchannel list */
+ rr_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
@@ -123,12 +114,9 @@ typedef struct {
const grpc_lb_user_data_vtable *user_data_vtable;
} subchannel_data;
-struct round_robin_lb_policy {
- /** base policy: must be first */
- grpc_lb_policy base;
-
- /** total number of addresses received at creation time */
- size_t num_addresses;
+struct rr_subchannel_list {
+ /** backpointer to owning policy */
+ round_robin_lb_policy *policy;
/** all our subchannels */
size_t num_subchannels;
@@ -141,67 +129,143 @@ struct round_robin_lb_policy {
/** how many subchannels are in state IDLE */
size_t num_idle;
- /** have we started picking? */
- bool started_picking;
- /** are we shutting down? */
- bool shutdown;
- /** List of picks that are waiting on connectivity */
- pending_pick *pending_picks;
-
- /** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
+ /** There will be one ref for each entry in subchannels for which there is a
+ * pending connectivity state watcher callback. */
+ gpr_refcount refcount;
- // Index into subchannels for last pick.
- size_t last_ready_subchannel_index;
+ /** Is this list shutting down? This may be true due to the shutdown of the
+ * policy itself or because a newer update has arrived while this one hadn't
+ * finished processing. */
+ bool shutting_down;
};
-/** Returns the index into p->subchannels of the next subchannel in
- * READY state, or p->num_subchannels if no subchannel is READY.
+static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list) {
+ GPR_ASSERT(subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
+ (void *)subchannel_list->policy, (void *)subchannel_list);
+ }
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
+ "rr_subchannel_list_destroy");
+ }
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ }
+ gpr_free(subchannel_list->subchannels);
+ gpr_free(subchannel_list);
+}
+
+static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ gpr_ref_non_zero(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count - 1), (unsigned long)count);
+ }
+}
+
+static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ const bool done = gpr_unref(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count + 1), (unsigned long)count);
+ }
+ if (done) {
+ rr_subchannel_list_destroy(exec_ctx, subchannel_list);
+ }
+}
+
+/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
+ * watcher's callback will ultimately unref \a subchannel_list. */
+static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ GPR_ASSERT(!subchannel_list->shutting_down);
+ subchannel_list->shutting_down = true;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
+ }
+ rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
+
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
static size_t get_next_ready_subchannel_index_locked(
const round_robin_lb_policy *p) {
+ GPR_ASSERT(p->subchannel_list != NULL);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
- "[RR: %p] getting next ready subchannel, "
+ "[RR %p] getting next ready subchannel (out of %lu), "
"last_ready_subchannel_index=%lu",
- p, (unsigned long)p->last_ready_subchannel_index);
+ (void *)p, (unsigned long)p->subchannel_list->num_subchannels,
+ (unsigned long)p->last_ready_subchannel_index);
}
- for (size_t i = 0; i < p->num_subchannels; ++i) {
- const size_t index =
- (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ const size_t index = (i + p->last_ready_subchannel_index + 1) %
+ p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
- (unsigned long)index,
- p->subchannels[index].curr_connectivity_state);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%d",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ p->subchannel_list->subchannels[index].curr_connectivity_state);
}
- if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
+ GRPC_CHANNEL_READY) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
- p, (unsigned long)index);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] found next ready subchannel (%p) at index %lu of "
+ "subchannel_list %p",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (unsigned long)index, (void *)p->subchannel_list);
}
return index;
}
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p);
}
- return p->num_subchannels;
+ return p->subchannel_list->num_subchannels;
}
// Sets p->last_ready_subchannel_index to last_ready_index.
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < p->num_subchannels);
+ GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
- (void *)p, (unsigned long)last_ready_index,
- (void *)p->subchannels[last_ready_index].subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->subchannels[last_ready_index].subchannel));
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannel_list->subchannels[last_ready_index].subchannel));
}
}
@@ -210,18 +274,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- }
- }
- }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
- gpr_free(p->subchannels);
gpr_free(p);
}
@@ -235,7 +288,7 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp);
@@ -243,14 +296,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
- NULL,
- &sd->connectivity_changed_closure);
- }
- }
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_shutdown");
+ p->subchannel_list = NULL;
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -263,7 +311,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -288,7 +336,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -304,15 +352,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
p->started_picking = true;
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
- }
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannel_list->subchannels[i];
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
@@ -332,63 +379,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- /* readily available, report right away */
- subchannel_data *sd = &p->subchannels[next_ready_index];
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
- if (user_data != NULL) {
- *user_data = sd->user_data;
- }
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
- (void *)*target, (unsigned long)next_ready_index);
- }
- /* only advance the last picked pointer if the selection was used */
- update_last_ready_subchannel_index_locked(p, next_ready_index);
- return 1;
- } else {
- /* no pick currently available. Save for later in list of pending picks */
- if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ if (p->subchannel_list != NULL) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ /* readily available, report right away */
+ subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index];
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "rr_picked");
+ if (user_data != NULL) {
+ *user_data = sd->user_data;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, "
+ "INDEX %lu)",
+ (void *)p, (void *)sd->subchannel, (void *)*target,
+ (void *)sd->subchannel_list, (unsigned long)next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
+ return 1;
}
- pending_pick *pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
- return 0;
}
+ /* no pick currently available. Save for later in list of pending picks */
+ if (!p->started_picking) {
+ start_picking_locked(exec_ctx, p);
+ }
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->on_complete = on_complete;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
+ p->pending_picks = pp;
+ return 0;
}
static void update_state_counters_locked(subchannel_data *sd) {
- round_robin_lb_policy *p = sd->policy;
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(p->num_ready > 0);
- --p->num_ready;
+ GPR_ASSERT(subchannel_list->num_ready > 0);
+ --subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(p->num_transient_failures > 0);
- --p->num_transient_failures;
+ GPR_ASSERT(subchannel_list->num_transient_failures > 0);
+ --subchannel_list->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(p->num_idle > 0);
- --p->num_idle;
+ GPR_ASSERT(subchannel_list->num_idle > 0);
+ --subchannel_list->num_idle;
}
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++p->num_ready;
+ ++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++p->num_transient_failures;
+ ++subchannel_list->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++p->num_idle;
+ ++subchannel_list->num_idle;
}
}
-/* sd is the subchannel_data associted with the updated subchannel.
- * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
- * or SHUTDOWN */
+/** Sets the policy's connectivity status based on that of the passed-in \a sd
+ * (the subchannel_data associted with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
+ * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
+ * connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
@@ -401,17 +455,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_subchannels = 0.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->num_subchannels.
+ * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->num_subchannels.
+ * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
- round_robin_lb_policy *p = sd->policy;
- if (p->num_ready > 0) { /* 1) READY */
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
+ round_robin_lb_policy *p = subchannel_list->policy;
+ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -421,18 +476,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
return GRPC_CHANNEL_CONNECTING;
- } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
return GRPC_CHANNEL_SHUTDOWN;
- } else if (p->num_transient_failures ==
- p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ } else if (subchannel_list->num_transient_failures ==
+ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
return GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ } else if (subchannel_list->num_idle ==
+ p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
return GRPC_CHANNEL_IDLE;
@@ -444,7 +500,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
- round_robin_lb_policy *p = sd->policy;
+ round_robin_lb_policy *p = sd->subchannel_list->policy;
+ // If the policy is shutting down, unref and return.
+ if (p->shutdown) {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
+ return;
+ }
+ if (sd->subchannel_list->shutting_down) {
+ // the subchannel list associated with sd has been discarded. This callback
+ // corresponds to the unsubscription.
+ GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
+ return;
+ }
+ // Dispose of outdated subchannel lists.
+ if (sd->subchannel_list != p->subchannel_list &&
+ sd->subchannel_list != p->latest_pending_subchannel_list) {
+ // sd belongs to an outdated subchannel_list: get rid of it.
+ rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
+ return;
+ }
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -453,21 +530,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p: "
"prev_state=%d new_state=%d",
- p, sd->subchannel, sd->prev_connectivity_state,
+ (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
sd->curr_connectivity_state);
}
- // If we're shutting down, unref and return.
- if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- return;
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
- grpc_connectivity_state new_connectivity_state =
+ const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
- // If the new state is SHUTDOWN, unref the subchannel, and if the new
- // overall state is SHUTDOWN, clean up.
+ // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
+ // policy's state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
sd->subchannel = NULL;
@@ -475,26 +547,53 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
- if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
/* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- } else {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
+ "rr_connectivity_sd_shutdown");
+ } else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (sd->subchannel_list != p->subchannel_list) {
+ // promote sd->subchannel_list to p->subchannel_list.
+ // sd->subchannel_list must be equal to
+ // p->latest_pending_subchannel_list because we have already filtered
+ // for sds belonging to outdated subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(!sd->subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] phasing out subchannel list %p (size %lu) in favor "
+ "of %p (size %lu)",
+ (void *)p, (void *)p->subchannel_list,
+ (unsigned long)p->subchannel_list->num_subchannels,
+ (void *)sd->subchannel_list,
+ (unsigned long)sd->subchannel_list->num_subchannels);
+ }
+ if (p->subchannel_list != NULL) {
+ // dispose of the current subchannel_list
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update_connectivity");
+ }
+ p->subchannel_list = sd->subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- GPR_ASSERT(next_ready_index < p->num_subchannels);
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
@@ -515,11 +614,12 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
(void *)selected->subchannel,
(unsigned long)next_ready_index);
}
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
- /* renew notification: reuses the "rr_connectivity" weak ref */
+ /* renew notification: reuses the "rr_connectivity" weak ref on the policy
+ * as well as the sd->subchannel_list ref. */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@@ -546,65 +646,82 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
}
}
-static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown_locked,
- rr_pick_locked,
- rr_cancel_pick_locked,
- rr_cancel_picks_locked,
- rr_ping_one_locked,
- rr_exit_idle_locked,
- rr_check_connectivity_locked,
- rr_notify_on_state_change_locked};
-
-static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
-
-static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-
-static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
+static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer addresses, since
+ * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
+ if (p->subchannel_list == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "rr_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Round Robin %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
- if (num_addrs == 0) return NULL;
-
- round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->num_addresses = num_addrs;
- p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
-
- grpc_subchannel_args sc_args;
+ if (num_addrs == 0) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update");
+ p->subchannel_list = NULL;
+ }
+ return;
+ }
size_t subchannel_index = 0;
+ rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
+ subchannel_list->policy = p;
+ subchannel_list->subchannels =
+ gpr_zalloc(sizeof(subchannel_data) * num_addrs);
+ subchannel_list->num_subchannels = num_addrs;
+ gpr_ref_init(&subchannel_list->refcount, 1);
+ p->latest_pending_subchannel_list = subchannel_list;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
+ (void *)subchannel_list, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args sc_args;
+ /* We need to remove the LB addresses in order to be able to compare the
+ * subchannel keys of subchannels from a different batch of addresses. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ /* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ GPR_ASSERT(i < num_addrs);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
@@ -618,52 +735,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
- (unsigned long)subchannel_index, (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG,
+ "index %lu: Created subchannel %p for address uri %s into "
+ "subchannel_list %p",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri,
+ (void *)subchannel_list);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
- if (subchannel != NULL) {
- subchannel_data *sd = &p->subchannels[subchannel_index];
- sd->policy = p;
- sd->subchannel = subchannel;
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != NULL) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
- ++subchannel_index;
+ subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
+ sd->subchannel_list = subchannel_list;
+ sd->subchannel = subchannel;
+ GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
+ rr_connectivity_changed_locked, sd,
+ grpc_combiner_scheduler(args->combiner));
+ /* use some sentinel value outside of the range of
+ * grpc_connectivity_state to signal an undefined previous state. We
+ * won't be referring to this value again and it'll be overwritten after
+ * the first call to rr_connectivity_changed_locked */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->user_data_vtable = addresses->user_data_vtable;
+ if (sd->user_data_vtable != NULL) {
+ sd->user_data =
+ sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ }
+ if (p->started_picking) {
+ rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
+ /* 2. Watch every new subchannel. A subchannel list becomes active the
+ * moment one of its subchannels is READY. At that moment, we swap
+ * p->subchannel_list for sd->subchannel_list, provided the subchannel
+ * list is still valid (ie, isn't shutting down) */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
- if (subchannel_index == 0) {
- /* couldn't create any subchannel. Bail out */
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
+ if (!p->started_picking) {
+ // The policy isn't picking yet. Save the update for later, disposing of
+ // previous version if any.
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "rr_update_before_started_picking");
+ }
+ p->subchannel_list = subchannel_list;
}
- p->num_subchannels = subchannel_index;
+}
+
+static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
+ rr_destroy,
+ rr_shutdown_locked,
+ rr_pick_locked,
+ rr_cancel_pick_locked,
+ rr_cancel_picks_locked,
+ rr_ping_one_locked,
+ rr_exit_idle_locked,
+ rr_check_connectivity_locked,
+ rr_notify_on_state_change_locked,
+ rr_update_locked};
+
+static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
- // Initialize the last pick index to the last subchannel, so that the
- // first pick will start at the beginning of the list.
- p->last_ready_subchannel_index = subchannel_index - 1;
+static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
+static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(args->client_channel_factory != NULL);
+ round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
+ rr_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
- (void *)p, (unsigned long)p->num_subchannels);
+ gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p,
+ (unsigned long)p->subchannel_list->num_subchannels);
}
return &p->base;
}