diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 170 |
1 files changed, 82 insertions, 88 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6ea1f025df..c05557ba6f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -20,9 +20,9 @@ * * Before every pick, the \a get_next_ready_subchannel_index_locked function * returns the p->subchannel_list->subchannels index for next subchannel, - * respecting the relative - * order of the addresses provided upon creation or updates. Note however that - * updates will start picking from the beginning of the updated list. */ + * respecting the relative order of the addresses provided upon creation or + * updates. Note however that updates will start picking from the beginning of + * the updated list. */ #include <string.h> @@ -39,8 +39,7 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -grpc_tracer_flag grpc_lb_round_robin_trace = - GRPC_TRACER_INITIALIZER(false, "round_robin"); +grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); /** List of entities waiting for a pick. * @@ -101,7 +100,7 @@ typedef struct round_robin_lb_policy { static size_t get_next_ready_subchannel_index_locked( const round_robin_lb_policy* p) { GPR_ASSERT(p->subchannel_list != nullptr); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] getting next ready subchannel (out of %lu), " "last_ready_subchannel_index=%lu", @@ -111,7 +110,7 @@ static size_t get_next_ready_subchannel_index_locked( for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { const size_t index = (i + p->last_ready_subchannel_index + 1) % p->subchannel_list->num_subchannels; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " @@ -123,7 +122,7 @@ static size_t get_next_ready_subchannel_index_locked( } if (p->subchannel_list->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel (%p) at index %lu of " "subchannel_list %p", @@ -134,7 +133,7 @@ static size_t get_next_ready_subchannel_index_locked( return index; } } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p); } return p->subchannel_list->num_subchannels; @@ -145,7 +144,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, size_t last_ready_index) { GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); p->last_ready_subchannel_index = last_ready_index; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", (void*)p, (unsigned long)last_ready_index, @@ -157,7 +156,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", (void*)pol, (void*)pol); } @@ -168,9 +167,10 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { gpr_free(p); } -static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p, - grpc_error* error) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { +static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { + round_robin_lb_policy* p = (round_robin_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } p->shutdown = true; @@ -195,15 +195,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p, "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { - round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); -} - static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -256,10 +252,12 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[i]); + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); + } } } @@ -276,7 +274,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, p->shutdown); } @@ -292,7 +290,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, if (user_data != nullptr) { *user_data = sd->user_data; } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " @@ -347,71 +345,63 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { } /** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associted with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be - * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the - * connectivity status set. */ -static grpc_connectivity_state update_lb_connectivity_status_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) { + * (the grpc_lb_subchannel_data associated with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used + * only if the policy transitions to state TRANSIENT_FAILURE. */ +static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx, + grpc_lb_subchannel_data* sd, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * * 1) RULE: ANY subchannel is READY => policy is READY. - * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * CHECK: subchannel_list->num_ready > 0. * * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * - * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->subchannel_list->num_shutdown == - * p->subchannel_list->num_subchannels. - * - * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. + * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests + * re-resolution). + * CHECK: subchannel_list->num_shutdown == + * subchannel_list->num_subchannels. * - * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->subchannel_list->num_subchannels. + * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_shutdown + + * subchannel_list->num_transient_failures == + * subchannel_list->num_subchannels. */ - grpc_connectivity_state new_state = sd->curr_connectivity_state; + // TODO(juanlishen): For rule 4, we may want to re-resolve instead. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; - if (subchannel_list->num_ready > 0) { /* 1) READY */ + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); + if (subchannel_list->num_ready > 0) { + /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - new_state = GRPC_CHANNEL_READY; - } else if (sd->curr_connectivity_state == - GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + /* 2) CONNECTING */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - new_state = GRPC_CHANNEL_CONNECTING; - } else if (p->subchannel_list->num_shutdown == - p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "rr_shutdown"); - p->shutdown = true; - new_state = GRPC_CHANNEL_SHUTDOWN; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, - "[RR %p] Shutting down: all subchannels have gone into shutdown", - (void*)p); - } - } else if (subchannel_list->num_transient_failures == - p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + } else if (subchannel_list->num_shutdown == + subchannel_list->num_subchannels) { + /* 3) IDLE and re-resolve */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "rr_exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_NONE); + } else if (subchannel_list->num_shutdown + + subchannel_list->num_transient_failures == + subchannel_list->num_subchannels) { + /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); - new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (subchannel_list->num_idle == - p->subchannel_list->num_subchannels) { /* 5) IDLE */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, "rr_idle"); - new_state = GRPC_CHANNEL_IDLE; } GRPC_ERROR_UNREF(error); - return new_state; } static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, @@ -419,7 +409,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; round_robin_lb_policy* p = (round_robin_lb_policy*)sd->subchannel_list->policy; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " @@ -455,21 +445,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and determine new overall state. + // Update state counters and new overall state. update_state_counters_locked(sd); - const grpc_connectivity_state new_policy_connectivity_state = - update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new - // policy's state is SHUTDOWN, clean up. + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the sd's new state is SHUTDOWN, unref the subchannel. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_connectivity_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); - if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); - } } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->connected_subchannel == nullptr) { @@ -484,7 +469,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // for sds belonging to outdated subchannel lists. GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); GPR_ASSERT(!sd->subchannel_list->shutting_down); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { const unsigned long num_subchannels = p->subchannel_list != nullptr ? (unsigned long)p->subchannel_list->num_subchannels @@ -505,7 +490,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); grpc_lb_subchannel_data* selected = @@ -523,7 +508,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (pp->user_data != nullptr) { *pp->user_data = selected->user_data; } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " "(subchannel_list %p, index %lu)", @@ -590,7 +575,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, return; } grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, addresses->num_addresses); } @@ -611,7 +596,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, } if (p->started_picking) { if (p->latest_pending_subchannel_list != nullptr) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " "about to be replaced by newer latest %p", @@ -643,6 +628,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, } } +static void rr_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + round_robin_lb_policy* p = (round_robin_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown_locked, @@ -653,7 +647,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_exit_idle_locked, rr_check_connectivity_locked, rr_notify_on_state_change_locked, - rr_update_locked}; + rr_update_locked, + rr_set_reresolve_closure_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} @@ -669,7 +664,7 @@ static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); rr_update_locked(exec_ctx, &p->base, args); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, (unsigned long)p->subchannel_list->num_subchannels); } @@ -689,9 +684,8 @@ static grpc_lb_policy_factory* round_robin_lb_factory_create() { /* Plugin registration */ -extern "C" void grpc_lb_policy_round_robin_init() { +void grpc_lb_policy_round_robin_init() { grpc_register_lb_policy(round_robin_lb_factory_create()); - grpc_register_tracer(&grpc_lb_round_robin_trace); } -extern "C" void grpc_lb_policy_round_robin_shutdown() {} +void grpc_lb_policy_round_robin_shutdown() {} |