diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 124 |
1 files changed, 48 insertions, 76 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index a964af0627..92c7d5bd5d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -41,31 +41,6 @@ grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); -namespace { -/** List of entities waiting for a pick. - * - * Once a pick is available, \a target is updated and \a on_complete called. */ -struct pending_pick { - pending_pick* next; - - /* output argument where to store the pick()ed user_data. It'll be NULL if no - * such data is present or there's an error (the definite test for errors is - * \a target being NULL). */ - void** user_data; - - /* bitmask passed to pick() and used for selective cancelling. See - * grpc_lb_policy_cancel_picks() */ - uint32_t initial_metadata_flags; - - /* output argument where to store the pick()ed connected subchannel, or NULL - * upon error. */ - grpc_connected_subchannel** target; - - /* to be invoked once the pick() has completed (regardless of success) */ - grpc_closure* on_complete; -}; -} // namespace - typedef struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -77,7 +52,7 @@ typedef struct round_robin_lb_policy { /** are we shutting down? */ bool shutdown; /** List of picks that are waiting on connectivity */ - pending_pick* pending_picks; + grpc_lb_policy_pick_state* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; @@ -169,19 +144,27 @@ static void rr_destroy(grpc_lb_policy* pol) { gpr_free(p); } -static void rr_shutdown_locked(grpc_lb_policy* pol) { +static void rr_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } p->shutdown = true; - pending_pick* pp; - while ((pp = p->pending_picks) != nullptr) { - p->pending_picks = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks) != nullptr) { + p->pending_picks = pick->next; + if (new_policy != nullptr) { + // Hand off to new LB policy. + if (grpc_lb_policy_pick_locked(new_policy, pick)) { + // Synchronous return; schedule callback. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } + } else { + pick->connected_subchannel = nullptr; + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + } } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); @@ -201,19 +184,18 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) { } static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + grpc_lb_policy_pick_state* next = pp->next; + if (pp == pick) { + pick->connected_subchannel = nullptr; + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); - gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -228,22 +210,21 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pick = p->pending_picks; p->pending_picks = nullptr; - while (pp != nullptr) { - pending_pick* next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + while (pick != nullptr) { + grpc_lb_policy_pick_state* next = pick->next; + if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + pick->connected_subchannel = nullptr; + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); - gpr_free(pp); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; } - pp = next; + pick = next; } GRPC_ERROR_UNREF(error); } @@ -268,13 +249,10 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) { } static int rr_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { + grpc_lb_policy_pick_state* pick) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, + gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, p->shutdown); } GPR_ASSERT(!p->shutdown); @@ -284,18 +262,18 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - *target = + pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); - if (user_data != nullptr) { - *user_data = sd->user_data; + if (pick->user_data != nullptr) { + *pick->user_data = sd->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " - "index %lu)", - (void*)p, (void*)sd->subchannel, (void*)*target, - (void*)sd->subchannel_list, (unsigned long)next_ready_index); + "index %" PRIuPTR ")", + p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list, + next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -306,13 +284,8 @@ static int rr_pick_locked(grpc_lb_policy* pol, if (!p->started_picking) { start_picking_locked(p); } - pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; return 0; } @@ -495,13 +468,13 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // picks, update the last picked pointer update_last_ready_subchannel_index_locked(p, next_ready_index); } - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks)) { + p->pending_picks = pick->next; + pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_picked"); - if (pp->user_data != nullptr) { - *pp->user_data = selected->user_data; + if (pick->user_data != nullptr) { + *pick->user_data = selected->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -510,8 +483,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { (void*)p, (void*)selected->subchannel, (void*)p->subchannel_list, (unsigned long)next_ready_index); } - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } // Renew notification. |