aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc172
1 files changed, 73 insertions, 99 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b0c84017df..24c381a46d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -34,6 +34,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -41,29 +42,6 @@
grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
-/** List of entities waiting for a pick.
- *
- * Once a pick is available, \a target is updated and \a on_complete called. */
-typedef struct pending_pick {
- struct pending_pick* next;
-
- /* output argument where to store the pick()ed user_data. It'll be NULL if no
- * such data is present or there's an error (the definite test for errors is
- * \a target being NULL). */
- void** user_data;
-
- /* bitmask passed to pick() and used for selective cancelling. See
- * grpc_lb_policy_cancel_picks() */
- uint32_t initial_metadata_flags;
-
- /* output argument where to store the pick()ed connected subchannel, or NULL
- * upon error. */
- grpc_connected_subchannel** target;
-
- /* to be invoked once the pick() has completed (regardless of success) */
- grpc_closure* on_complete;
-} pending_pick;
-
typedef struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -75,7 +53,7 @@ typedef struct round_robin_lb_policy {
/** are we shutting down? */
bool shutdown;
/** List of picks that are waiting on connectivity */
- pending_pick* pending_picks;
+ grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
@@ -150,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
(void*)p, (unsigned long)last_ready_index,
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index]
- .connected_subchannel);
+ .connected_subchannel.get());
}
}
@@ -167,19 +145,27 @@ static void rr_destroy(grpc_lb_policy* pol) {
gpr_free(p);
}
-static void rr_shutdown_locked(grpc_lb_policy* pol) {
+static void rr_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
p->shutdown = true;
- pending_pick* pp;
- while ((pp = p->pending_picks) != nullptr) {
- p->pending_picks = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
- gpr_free(pp);
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks) != nullptr) {
+ p->pending_picks = pick->next;
+ if (new_policy != nullptr) {
+ // Hand off to new LB policy.
+ if (grpc_lb_policy_pick_locked(new_policy, pick)) {
+ // Synchronous return; schedule callback.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+ } else {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
@@ -199,19 +185,18 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) {
}
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ grpc_lb_policy_pick_state* next = pp->next;
+ if (pp == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
- gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
@@ -226,22 +211,21 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
- while (pp != nullptr) {
- pending_pick* next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ while (pick != nullptr) {
+ grpc_lb_policy_pick_state* next = pick->next;
+ if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
- gpr_free(pp);
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
}
- pp = next;
+ pick = next;
}
GRPC_ERROR_UNREF(error);
}
@@ -266,13 +250,10 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) {
}
static int rr_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
+ grpc_lb_policy_pick_state* pick) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol,
+ gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
p->shutdown);
}
GPR_ASSERT(!p->shutdown);
@@ -282,18 +263,17 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index];
- *target =
- GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
- if (user_data != nullptr) {
- *user_data = sd->user_data;
+ pick->connected_subchannel = sd->connected_subchannel;
+ if (pick->user_data != nullptr) {
+ *pick->user_data = sd->user_data;
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
- "index %lu)",
- (void*)p, (void*)sd->subchannel, (void*)*target,
- (void*)sd->subchannel_list, (unsigned long)next_ready_index);
+ "index %" PRIuPTR ")",
+ p, sd->subchannel, pick->connected_subchannel.get(),
+ sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index);
@@ -304,27 +284,21 @@ static int rr_pick_locked(grpc_lb_policy* pol,
if (!p->started_picking) {
start_picking_locked(p);
}
- pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
return 0;
}
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -334,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -435,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -442,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
+ // subchannel, if any.
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ sd->connected_subchannel.reset();
+ break;
+ }
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
@@ -493,13 +465,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// picks, update the last picked pointer
update_last_ready_subchannel_index_locked(p, next_ready_index);
}
- pending_pick* pp;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_picked");
- if (pp->user_data != nullptr) {
- *pp->user_data = selected->user_data;
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks)) {
+ p->pending_picks = pick->next;
+ pick->connected_subchannel = selected->connected_subchannel;
+ if (pick->user_data != nullptr) {
+ *pick->user_data = selected->user_data;
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
@@ -508,13 +479,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
(void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list, (unsigned long)next_ready_index);
}
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(return );
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -538,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(target, on_initiate, on_ack);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
+ selected->connected_subchannel;
+ target->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));