diff options
author | Mark D. Roth <roth@google.com> | 2018-02-20 08:33:48 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2018-02-20 08:33:48 -0800 |
commit | c887549f9296d893957c6df17deaf5e2c6f4f633 (patch) | |
tree | 79e4d2a5909348a8065442de6d67ad1cc3f7b860 /src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | |
parent | 84f94c17aa5ba19daa7cf001360719bdda7be407 (diff) |
Convert LB policy API to C++.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 584 |
1 files changed, 298 insertions, 286 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b5b4c44ef1..178e299b61 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -40,34 +40,94 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); - -typedef struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - grpc_lb_subchannel_list* subchannel_list; - +namespace grpc_core { + +TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); + +namespace { + +// +// round_robin LB policy +// + +class RoundRobin : public LoadBalancingPolicy { + public: + explicit RoundRobin(const Args& args); + + void UpdateLocked(const grpc_channel_args& args) override; + bool PickLocked(PickState* pick) override; + void CancelPickLocked(PickState* pick, grpc_error* error) override; + void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) override; + void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) override; + grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) override; + void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; + void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; + void ExitIdleLocked() override; + + private: + ~RoundRobin(); + + void ShutdownLocked() override; + + void StartPickingLocked(); + size_t GetNextReadySubchannelIndexLocked(); + void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); + void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, + grpc_error* error); + + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + void SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); + void SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); + + /** list of subchannels */ + grpc_lb_subchannel_list* subchannel_list_ = nullptr; /** have we started picking? */ - bool started_picking; + bool started_picking_ = false; /** are we shutting down? */ - bool shutdown; + bool shutdown_ = false; /** List of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; - + PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; - + grpc_connectivity_state_tracker state_tracker_; /** Index into subchannels for last pick. */ - size_t last_ready_subchannel_index; - + size_t last_ready_subchannel_index_ = 0; /** Latest version of the subchannel list. * Subchannel connectivity callbacks will only promote updated subchannel * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ - grpc_lb_subchannel_list* latest_pending_subchannel_list; -} round_robin_lb_policy; + grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; +}; + +RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { + GPR_ASSERT(args.client_channel_factory != nullptr); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "round_robin"); + UpdateLocked(*args.args); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, + subchannel_list_->num_subchannels); + } + grpc_subchannel_index_ref(); +} + +RoundRobin::~RoundRobin() { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); + } + GPR_ASSERT(subchannel_list_ == nullptr); + GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); + GPR_ASSERT(pending_picks_ == nullptr); + grpc_connectivity_state_destroy(&state_tracker_); + grpc_subchannel_index_unref(); +} /** Returns the index into p->subchannel_list->subchannels of the next * subchannel in READY state, or p->subchannel_list->num_subchannels if no @@ -75,195 +135,190 @@ typedef struct round_robin_lb_policy { * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ -static size_t get_next_ready_subchannel_index_locked( - const round_robin_lb_policy* p) { - GPR_ASSERT(p->subchannel_list != nullptr); +size_t RoundRobin::GetNextReadySubchannelIndexLocked() { + GPR_ASSERT(subchannel_list_ != nullptr); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, - "[RR %p] getting next ready subchannel (out of %lu), " - "last_ready_subchannel_index=%lu", - (void*)p, - static_cast<unsigned long>(p->subchannel_list->num_subchannels), - static_cast<unsigned long>(p->last_ready_subchannel_index)); - } - for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { - const size_t index = (i + p->last_ready_subchannel_index + 1) % - p->subchannel_list->num_subchannels; + "[RR %p] getting next ready subchannel (out of %" PRIuPTR + "), " + "last_ready_subchannel_index=%" PRIuPTR, + this, subchannel_list_->num_subchannels, + last_ready_subchannel_index_); + } + for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { + const size_t index = (i + last_ready_subchannel_index_ + 1) % + subchannel_list_->num_subchannels; if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%s", - (void*)p, (void*)p->subchannel_list->subchannels[index].subchannel, - (void*)p->subchannel_list, static_cast<unsigned long>(index), + "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR + ": state=%s", + this, subchannel_list_->subchannels[index].subchannel, + subchannel_list_, index, grpc_connectivity_state_name( - p->subchannel_list->subchannels[index].curr_connectivity_state)); + subchannel_list_->subchannels[index].curr_connectivity_state)); } - if (p->subchannel_list->subchannels[index].curr_connectivity_state == + if (subchannel_list_->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, - "[RR %p] found next ready subchannel (%p) at index %lu of " - "subchannel_list %p", - (void*)p, - (void*)p->subchannel_list->subchannels[index].subchannel, - static_cast<unsigned long>(index), (void*)p->subchannel_list); + "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR + " of subchannel_list %p", + this, subchannel_list_->subchannels[index].subchannel, index, + subchannel_list_); } return index; } } if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); } - return p->subchannel_list->num_subchannels; + return subchannel_list_->num_subchannels; } -// Sets p->last_ready_subchannel_index to last_ready_index. -static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, - size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); - p->last_ready_subchannel_index = last_ready_index; +// Sets last_ready_subchannel_index_ to last_ready_index. +void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { + GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); + last_ready_subchannel_index_ = last_ready_index; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, - "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void*)p, static_cast<unsigned long>(last_ready_index), - (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, - (void*)p->subchannel_list->subchannels[last_ready_index] + "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR + " (SC %p, CSC %p)", + this, last_ready_index, + subchannel_list_->subchannels[last_ready_index].subchannel, + subchannel_list_->subchannels[last_ready_index] .connected_subchannel.get()); } } -static void rr_destroy(grpc_lb_policy* pol) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", - (void*)pol, (void*)pol); +void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + if (new_policy->PickLocked(pick)) { + // Synchronous return, schedule closure. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } } - GPR_ASSERT(p->subchannel_list == nullptr); - GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); - grpc_connectivity_state_destroy(&p->state_tracker); - grpc_subchannel_index_unref(); - gpr_free(p); } -static void rr_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); +void RoundRobin::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); - } - p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return; schedule callback. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel.reset(); - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); - } + gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); + } + shutdown_ = true; + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_rr_shutdown"); - p->subchannel_list = nullptr; + subchannel_list_ = nullptr; } - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); - p->latest_pending_subchannel_list = nullptr; + latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); + latest_pending_subchannel_list_ = nullptr; } - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_CANCELLED); + TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, - grpc_error* error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_lb_policy_pick_state* pp = p->pending_picks; - p->pending_picks = nullptr; +void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) { + PickState* pp = pending_picks_; + pending_picks_ = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; + PickState* next = pp->next; if (pp == pick) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + "Pick Cancelled", &error, 1)); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pp->next = pending_picks_; + pending_picks_ = pp; } pp = next; } GRPC_ERROR_UNREF(error); } -static void rr_cancel_picks_locked(grpc_lb_policy* pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_lb_policy_pick_state* pick = p->pending_picks; - p->pending_picks = nullptr; +void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) { + PickState* pick = pending_picks_; + pending_picks_ = nullptr; while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; + PickState* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + "Pick Cancelled", &error, 1)); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pick->next = pending_picks_; + pending_picks_ = pick; } pick = next; } GRPC_ERROR_UNREF(error); } -static void start_picking_locked(round_robin_lb_policy* p) { - p->started_picking = true; - for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - if (p->subchannel_list->subchannels[i].subchannel != nullptr) { - grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, - "connectivity_watch"); +void RoundRobin::SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is ready and the subchannel_list code has been + // converted to a C++ API, find a way to hold the RefCountedPtr<> + // somewhere (maybe in the subchannel_data object) instead of doing + // this manually. + auto self = Ref(DEBUG_LOCATION, reason); + self.release(); + grpc_lb_subchannel_list_ref(subchannel_list, reason); +} + +void RoundRobin::SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + Unref(DEBUG_LOCATION, reason); + grpc_lb_subchannel_list_unref(subchannel_list, reason); +} + +void RoundRobin::StartPickingLocked() { + started_picking_ = true; + for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { + if (subchannel_list_->subchannels[i].subchannel != nullptr) { + SubchannelListRefForConnectivityWatch(subchannel_list_, + "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( - &p->subchannel_list->subchannels[i]); + &subchannel_list_->subchannels[i]); } } } -static void rr_exit_idle_locked(grpc_lb_policy* pol) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - if (!p->started_picking) { - start_picking_locked(p); +void RoundRobin::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); } } -static int rr_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); +bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, - p->shutdown); + gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, + shutdown_); } - GPR_ASSERT(!p->shutdown); - if (p->subchannel_list != nullptr) { - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->subchannel_list->num_subchannels) { + GPR_ASSERT(!shutdown_); + if (subchannel_list_ != nullptr) { + const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels) { /* readily available, report right away */ grpc_lb_subchannel_data* sd = - &p->subchannel_list->subchannels[next_ready_index]; + &subchannel_list_->subchannels[next_ready_index]; pick->connected_subchannel = sd->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; @@ -273,24 +328,24 @@ static int rr_pick_locked(grpc_lb_policy* pol, GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel.get(), + this, sd->subchannel, pick->connected_subchannel.get(), sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; + UpdateLastReadySubchannelIndexLocked(next_ready_index); + return true; } } /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(p); + if (!started_picking_) { + StartPickingLocked(); } - pick->next = p->pending_picks; - p->pending_picks = pick; - return 0; + pick->next = pending_picks_; + pending_picks_ = pick; + return false; } -static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { +void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); @@ -318,8 +373,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { * (the grpc_lb_subchannel_data associated with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used * only if the policy transitions to state TRANSIENT_FAILURE. */ -static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, - grpc_error* error) { +void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -335,64 +390,61 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * subchannel_list->num_subchannels. */ grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - round_robin_lb_policy* p = - reinterpret_cast<round_robin_lb_policy*>(subchannel_list->policy); GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); if (subchannel_list->num_ready > 0) { /* 1) READY */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); } else if (subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { /* 3) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "rr_transient_failure"); + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), + "rr_exhausted_subchannels"); } GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - round_robin_lb_policy* p = - reinterpret_cast<round_robin_lb_policy*>(sd->subchannel_list->policy); +void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { + grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg); + RoundRobin* p = reinterpret_cast<RoundRobin*>(sd->subchannel_list->policy); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " "prev_state=%s new_state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", - (void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list, + p, sd->subchannel, sd->subchannel_list, grpc_connectivity_state_name(sd->prev_connectivity_state), grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown, sd->subchannel_list->shutting_down, + p->shutdown_, sd->subchannel_list->shutting_down, grpc_error_string(error)); } GPR_ASSERT(sd->subchannel != nullptr); // If the policy is shutting down, unref and return. - if (p->shutdown) { + if (p->shutdown_) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "rr_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "rr_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "rr_sl_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "rr_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list || - sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || + sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to @@ -409,8 +461,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { "Requesting re-resolution", p, sd->subchannel); } - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_NONE); + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { @@ -418,49 +469,47 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); } - if (sd->subchannel_list != p->subchannel_list) { - // promote sd->subchannel_list to p->subchannel_list. + if (sd->subchannel_list != p->subchannel_list_) { + // promote sd->subchannel_list to p->subchannel_list_. // sd->subchannel_list must be equal to - // p->latest_pending_subchannel_list because we have already filtered + // p->latest_pending_subchannel_list_ because we have already filtered // for sds belonging to outdated subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(!sd->subchannel_list->shutting_down); if (grpc_lb_round_robin_trace.enabled()) { - const unsigned long num_subchannels = - p->subchannel_list != nullptr - ? static_cast<unsigned long>( - p->subchannel_list->num_subchannels) + const size_t num_subchannels = + p->subchannel_list_ != nullptr + ? p->subchannel_list_->num_subchannels : 0; gpr_log(GPR_DEBUG, - "[RR %p] phasing out subchannel list %p (size %lu) in favor " - "of %p (size %lu)", - p, p->subchannel_list, num_subchannels, sd->subchannel_list, + "[RR %p] phasing out subchannel list %p (size %" PRIuPTR + ") in favor of %p (size %" PRIuPTR ")", + p, p->subchannel_list_, num_subchannels, sd->subchannel_list, num_subchannels); } - if (p->subchannel_list != nullptr) { + if (p->subchannel_list_ != nullptr) { // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, "sl_phase_out_shutdown"); } - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = nullptr; + p->subchannel_list_ = p->latest_pending_subchannel_list_; + p->latest_pending_subchannel_list_ = nullptr; } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. - */ - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ + const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); + GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); grpc_lb_subchannel_data* selected = - &p->subchannel_list->subchannels[next_ready_index]; - if (p->pending_picks != nullptr) { + &p->subchannel_list_->subchannels[next_ready_index]; + if (p->pending_picks_ != nullptr) { // if the selected subchannel is going to be used for the pending // picks, update the last picked pointer - update_last_ready_subchannel_index_locked(p, next_ready_index); + p->UpdateLastReadySubchannelIndexLocked(next_ready_index); } - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; pick->connected_subchannel = selected->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; @@ -468,10 +517,9 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " - "(subchannel_list %p, index %lu)", - (void*)p, (void*)selected->subchannel, - (void*)p->subchannel_list, - static_cast<unsigned long>(next_ready_index)); + "(subchannel_list %p, index %" PRIuPTR ")", + p, selected->subchannel, p->subchannel_list_, + next_ready_index); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } @@ -482,40 +530,34 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } - // Update state counters and new overall state. - update_state_counters_locked(sd); + // Update state counters. + UpdateStateCountersLocked(sd); // Only update connectivity based on the selected subchannel list. - if (sd->subchannel_list == p->subchannel_list) { - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + if (sd->subchannel_list == p->subchannel_list_) { + p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } -static grpc_connectivity_state rr_check_connectivity_locked( - grpc_lb_policy* pol, grpc_error** error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - return grpc_connectivity_state_get(&p->state_tracker, error); +grpc_connectivity_state RoundRobin::CheckConnectivityLocked( + grpc_error** error) { + return grpc_connectivity_state_get(&state_tracker_, error); } -static void rr_notify_on_state_change_locked(grpc_lb_policy* pol, - grpc_connectivity_state* current, - grpc_closure* notify) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, +void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, notify); } -static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, +void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->subchannel_list->num_subchannels) { + const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels) { grpc_lb_subchannel_data* selected = - &p->subchannel_list->subchannels[next_ready_index]; - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = - selected->connected_subchannel; - target->Ping(on_initiate, on_ack); + &subchannel_list_->subchannels[next_ready_index]; + selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -524,45 +566,41 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, } } -static void rr_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* args) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy); - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); +void RoundRobin::UpdateLocked(const grpc_channel_args& args) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); + gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). - if (p->subchannel_list == nullptr) { + if (subchannel_list_ == nullptr) { grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); } return; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, - addresses->num_addresses); + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", + this, addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - &p->base, &grpc_lb_round_robin_trace, addresses, args, - rr_connectivity_changed_locked); + this, &grpc_lb_round_robin_trace, addresses, combiner(), + client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_empty_update"); } - p->subchannel_list = subchannel_list; // empty list + subchannel_list_ = subchannel_list; // empty list return; } - if (p->started_picking) { + if (started_picking_) { for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { const grpc_connectivity_state subchannel_state = grpc_subchannel_check_connectivity( @@ -587,87 +625,61 @@ static void rr_update_locked(grpc_lb_policy* policy, ++subchannel_list->num_transient_failures; } } - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " "about to be replaced by newer latest %p", - (void*)p, (void*)p->latest_pending_subchannel_list, - (void*)subchannel_list); + this, latest_pending_subchannel_list_, subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "sl_outdated"); + latest_pending_subchannel_list_, "sl_outdated"); } - p->latest_pending_subchannel_list = subchannel_list; + latest_pending_subchannel_list_ = subchannel_list; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { /* Watch every new subchannel. A subchannel list becomes active the * moment one of its subchannels is READY. At that moment, we swap * p->subchannel_list for sd->subchannel_list, provided the subchannel * list is still valid (ie, isn't shutting down) */ - grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, - "connectivity_watch"); + SubchannelListRefForConnectivityWatch(subchannel_list, + "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( &subchannel_list->subchannels[i]); } } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. - if (p->subchannel_list != nullptr) { + if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list, "rr_update_before_started_picking"); + subchannel_list_, "rr_update_before_started_picking"); } - p->subchannel_list = subchannel_list; + subchannel_list_ = subchannel_list; } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked, - rr_update_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {} - -static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory, - grpc_lb_policy_args* args) { - GPR_ASSERT(args->client_channel_factory != nullptr); - round_robin_lb_policy* p = - static_cast<round_robin_lb_policy*>(gpr_zalloc(sizeof(*p))); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); - grpc_subchannel_index_ref(); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - rr_update_locked(&p->base, args); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, - static_cast<unsigned long>(p->subchannel_list->num_subchannels)); - } - return &p->base; -} +// +// factory +// -static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, round_robin_create, - "round_robin"}; +class RoundRobinFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const override { + return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); + } -static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable}; + const char* name() const override { return "round_robin"; } +}; -static grpc_lb_policy_factory* round_robin_lb_factory_create() { - return &round_robin_lb_policy_factory; -} +} // namespace -/* Plugin registration */ +} // namespace grpc_core void grpc_lb_policy_round_robin_init() { - grpc_register_lb_policy(round_robin_lb_factory_create()); + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( + grpc_core::New<grpc_core::RoundRobinFactory>())); } void grpc_lb_policy_round_robin_shutdown() {} |