aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h2
-rw-r--r--src/android/test/interop/app/src/main/cpp/grpc-interop.cc5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc26
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc59
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h13
-rw-r--r--src/core/lib/iomgr/lockfree_event.cc6
6 files changed, 82 insertions, 29 deletions
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 9713333cf5..5819e068ba 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -367,7 +367,7 @@ class ServerCompletionQueue : public CompletionQueue {
protected:
/// Default constructor
- ServerCompletionQueue() {}
+ ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {}
private:
/// \param is_frequently_polled Informs the GRPC library about whether the
diff --git a/src/android/test/interop/app/src/main/cpp/grpc-interop.cc b/src/android/test/interop/app/src/main/cpp/grpc-interop.cc
index bbdc84abdd..07834250d2 100644
--- a/src/android/test/interop/app/src/main/cpp/grpc-interop.cc
+++ b/src/android/test/interop/app/src/main/cpp/grpc-interop.cc
@@ -45,9 +45,10 @@ std::shared_ptr<grpc::testing::InteropClient> GetClient(const char* host,
credentials = grpc::InsecureChannelCredentials();
}
+ grpc::testing::ChannelCreationFunc channel_creation_func =
+ std::bind(grpc::CreateChannel, host_port, credentials);
return std::shared_ptr<grpc::testing::InteropClient>(
- new grpc::testing::InteropClient(
- grpc::CreateChannel(host_port, credentials), true, false));
+ new grpc::testing::InteropClient(channel_creation_func, true, false));
}
extern "C" JNIEXPORT jboolean JNICALL
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 18e983d6f7..023281db97 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -181,7 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void PickFirst::ShutdownLocked() {
- AutoChildRefsUpdater gaurd(this);
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@@ -327,30 +327,10 @@ void PickFirst::FillChildRefsForChannelz(
void PickFirst::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
- for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
- if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- grpc_core::channelz::SubchannelNode* subchannel_node =
- grpc_subchannel_get_channelz_node(
- subchannel_list_->subchannel(i)->subchannel());
- if (subchannel_node != nullptr) {
- cs.push_back(subchannel_node->subchannel_uuid());
- }
- }
- }
+ subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
- for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
- ++i) {
- if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
- nullptr) {
- grpc_core::channelz::SubchannelNode* subchannel_node =
- grpc_subchannel_get_channelz_node(
- latest_pending_subchannel_list_->subchannel(i)->subchannel());
- if (subchannel_node != nullptr) {
- cs.push_back(subchannel_node->subchannel_uuid());
- }
- }
- }
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 09634a2ad4..fc56a4961f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -69,9 +69,8 @@ class RoundRobin : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
- // TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
- ChildRefsList* child_channels) override {}
+ ChildRefsList* ignored) override;
private:
~RoundRobin();
@@ -183,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
+ ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
+
+ private:
+ RoundRobin* rr_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
+ void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@@ -205,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
+ /// Lock and data used to capture snapshots of this channel's child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
@@ -223,6 +241,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -242,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void RoundRobin::ShutdownLocked() {
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@@ -365,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
+void RoundRobin::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void RoundRobin::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@@ -455,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
+ AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@@ -611,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 7e2046bcdc..018ac3bb86 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -189,6 +189,19 @@ class SubchannelList
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
+ // Populates refs_list with the uuids of this SubchannelLists's subchannels.
+ void PopulateChildRefsList(ChildRefsList* refs_list) {
+ for (size_t i = 0; i < subchannels_.size(); ++i) {
+ if (subchannels_[i].subchannel() != nullptr) {
+ grpc_core::channelz::SubchannelNode* subchannel_node =
+ grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
+ if (subchannel_node != nullptr) {
+ refs_list->push_back(subchannel_node->subchannel_uuid());
+ }
+ }
+ }
+ }
+
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc
index 5b6b79fa91..085fea40a4 100644
--- a/src/core/lib/iomgr/lockfree_event.cc
+++ b/src/core/lib/iomgr/lockfree_event.cc
@@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
void LockfreeEvent::NotifyOn(grpc_closure* closure) {
while (true) {
- gpr_atm curr = gpr_atm_no_barrier_load(&state_);
+ /* This load needs to be an acquire load because this can be a shutdown
+ * error that we might need to reference. Adding acquire semantics makes
+ * sure that the shutdown error has been initialized properly before us
+ * referencing it. */
+ gpr_atm curr = gpr_atm_acq_load(&state_);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);