aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc97
1 files changed, 76 insertions, 21 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 42e8e88ec9..fea84331d8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy {
explicit RoundRobin(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -67,8 +67,10 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* ignored) override;
private:
~RoundRobin();
@@ -180,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
+ ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
+
+ private:
+ RoundRobin* rr_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
+ void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@@ -202,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
+ /// Lock and data used to capture snapshots of this channel's child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
@@ -220,6 +241,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -231,14 +253,16 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
void RoundRobin::ShutdownLocked() {
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@@ -310,6 +334,13 @@ void RoundRobin::ExitIdleLocked() {
}
}
+void RoundRobin::ResetBackoffLocked() {
+ subchannel_list_->ResetBackoffLocked();
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->ResetBackoffLocked();
+ }
+}
+
bool RoundRobin::DoPickLocked(PickState* pick) {
const size_t next_ready_index =
subchannel_list_->GetNextReadySubchannelIndexLocked();
@@ -345,7 +376,7 @@ void RoundRobin::DrainPendingPicksLocked() {
}
}
-bool RoundRobin::PickLocked(PickState* pick) {
+bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
}
@@ -353,6 +384,11 @@ bool RoundRobin::PickLocked(PickState* pick) {
if (subchannel_list_ != nullptr) {
if (DoPickLocked(pick)) return true;
}
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
+ }
/* no pick currently available. Save for later in list of pending picks */
pick->next = pending_picks_;
pending_picks_ = pick;
@@ -362,6 +398,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
+void RoundRobin::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void RoundRobin::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@@ -452,6 +521,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
+ AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@@ -590,24 +660,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- const size_t next_ready_index =
- subchannel_list_->GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels()) {
- RoundRobinSubchannelData* selected =
- subchannel_list_->subchannel(next_ready_index);
- selected->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- }
-}
-
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.