aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc170
1 files changed, 82 insertions, 88 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 6ea1f025df..c05557ba6f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -20,9 +20,9 @@
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
- * respecting the relative
- * order of the addresses provided upon creation or updates. Note however that
- * updates will start picking from the beginning of the updated list. */
+ * respecting the relative order of the addresses provided upon creation or
+ * updates. Note however that updates will start picking from the beginning of
+ * the updated list. */
#include <string.h>
@@ -39,8 +39,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-grpc_tracer_flag grpc_lb_round_robin_trace =
- GRPC_TRACER_INITIALIZER(false, "round_robin");
+grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
/** List of entities waiting for a pick.
*
@@ -101,7 +100,7 @@ typedef struct round_robin_lb_policy {
static size_t get_next_ready_subchannel_index_locked(
const round_robin_lb_policy* p) {
GPR_ASSERT(p->subchannel_list != nullptr);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] getting next ready subchannel (out of %lu), "
"last_ready_subchannel_index=%lu",
@@ -111,7 +110,7 @@ static size_t get_next_ready_subchannel_index_locked(
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
@@ -123,7 +122,7 @@ static size_t get_next_ready_subchannel_index_locked(
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] found next ready subchannel (%p) at index %lu of "
"subchannel_list %p",
@@ -134,7 +133,7 @@ static size_t get_next_ready_subchannel_index_locked(
return index;
}
}
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p);
}
return p->subchannel_list->num_subchannels;
@@ -145,7 +144,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
size_t last_ready_index) {
GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
p->last_ready_subchannel_index = last_ready_index;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
(void*)p, (unsigned long)last_ready_index,
@@ -157,7 +156,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
(void*)pol, (void*)pol);
}
@@ -168,9 +167,10 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
gpr_free(p);
}
-static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
- grpc_error* error) {
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
p->shutdown = true;
@@ -195,15 +195,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
"sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
+ grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
- round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- shutdown_locked(exec_ctx, p,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
-}
-
static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@@ -256,10 +252,12 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[i]);
+ if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
+ "connectivity_watch");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &p->subchannel_list->subchannels[i]);
+ }
}
}
@@ -276,7 +274,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol,
p->shutdown);
}
@@ -292,7 +290,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
if (user_data != nullptr) {
*user_data = sd->user_data;
}
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
@@ -347,71 +345,63 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
- * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
- * connectivity status set. */
-static grpc_connectivity_state update_lb_connectivity_status_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) {
+ * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
+ * only if the policy transitions to state TRANSIENT_FAILURE. */
+static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
+ grpc_lb_subchannel_data* sd,
+ grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
- * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+ * CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
- * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->subchannel_list->num_shutdown ==
- * p->subchannel_list->num_subchannels.
- *
- * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
- * TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
+ * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
+ * re-resolution).
+ * CHECK: subchannel_list->num_shutdown ==
+ * subchannel_list->num_subchannels.
*
- * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
+ * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is
+ * TRANSIENT_FAILURE.
+ * CHECK: subchannel_list->num_shutdown +
+ * subchannel_list->num_transient_failures ==
+ * subchannel_list->num_subchannels.
*/
- grpc_connectivity_state new_state = sd->curr_connectivity_state;
+ // TODO(juanlishen): For rule 4, we may want to re-resolve instead.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
- if (subchannel_list->num_ready > 0) { /* 1) READY */
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
+ if (subchannel_list->num_ready > 0) {
+ /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- new_state = GRPC_CHANNEL_READY;
- } else if (sd->curr_connectivity_state ==
- GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
- new_state = GRPC_CHANNEL_CONNECTING;
- } else if (p->subchannel_list->num_shutdown ==
- p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "rr_shutdown");
- p->shutdown = true;
- new_state = GRPC_CHANNEL_SHUTDOWN;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_INFO,
- "[RR %p] Shutting down: all subchannels have gone into shutdown",
- (void*)p);
- }
- } else if (subchannel_list->num_transient_failures ==
- p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ } else if (subchannel_list->num_shutdown ==
+ subchannel_list->num_subchannels) {
+ /* 3) IDLE and re-resolve */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
+ "rr_exhausted_subchannels+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_NONE);
+ } else if (subchannel_list->num_shutdown +
+ subchannel_list->num_transient_failures ==
+ subchannel_list->num_subchannels) {
+ /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (subchannel_list->num_idle ==
- p->subchannel_list->num_subchannels) { /* 5) IDLE */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE, "rr_idle");
- new_state = GRPC_CHANNEL_IDLE;
}
GRPC_ERROR_UNREF(error);
- return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
@@ -419,7 +409,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
round_robin_lb_policy* p =
(round_robin_lb_policy*)sd->subchannel_list->policy;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
@@ -455,21 +445,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // Update state counters and determine new overall state.
+ // Update state counters and new overall state.
update_state_counters_locked(sd);
- const grpc_connectivity_state new_policy_connectivity_state =
- update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
- // policy's state is SHUTDOWN, clean up.
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
- if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
- }
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@@ -484,7 +469,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// for sds belonging to outdated subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
const unsigned long num_subchannels =
p->subchannel_list != nullptr
? (unsigned long)p->subchannel_list->num_subchannels
@@ -505,7 +490,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+ * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@@ -523,7 +508,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (pp->user_data != nullptr) {
*pp->user_data = selected->user_data;
}
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
"(subchannel_list %p, index %lu)",
@@ -590,7 +575,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
return;
}
grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p,
addresses->num_addresses);
}
@@ -611,7 +596,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
}
if (p->started_picking) {
if (p->latest_pending_subchannel_list != nullptr) {
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, "
"about to be replaced by newer latest %p",
@@ -643,6 +628,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
}
}
+static void rr_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@@ -653,7 +647,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
- rr_update_locked};
+ rr_update_locked,
+ rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
@@ -669,7 +664,7 @@ static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx,
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
rr_update_locked(exec_ctx, &p->base, args);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p,
(unsigned long)p->subchannel_list->num_subchannels);
}
@@ -689,9 +684,8 @@ static grpc_lb_policy_factory* round_robin_lb_factory_create() {
/* Plugin registration */
-extern "C" void grpc_lb_policy_round_robin_init() {
+void grpc_lb_policy_round_robin_init() {
grpc_register_lb_policy(round_robin_lb_factory_create());
- grpc_register_tracer(&grpc_lb_round_robin_trace);
}
-extern "C" void grpc_lb_policy_round_robin_shutdown() {}
+void grpc_lb_policy_round_robin_shutdown() {}