diff options
6 files changed, 82 insertions, 29 deletions
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 9713333cf5..5819e068ba 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -367,7 +367,7 @@ class ServerCompletionQueue : public CompletionQueue { protected: /// Default constructor - ServerCompletionQueue() {} + ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} private: /// \param is_frequently_polled Informs the GRPC library about whether the diff --git a/src/android/test/interop/app/src/main/cpp/grpc-interop.cc b/src/android/test/interop/app/src/main/cpp/grpc-interop.cc index bbdc84abdd..07834250d2 100644 --- a/src/android/test/interop/app/src/main/cpp/grpc-interop.cc +++ b/src/android/test/interop/app/src/main/cpp/grpc-interop.cc @@ -45,9 +45,10 @@ std::shared_ptr<grpc::testing::InteropClient> GetClient(const char* host, credentials = grpc::InsecureChannelCredentials(); } + grpc::testing::ChannelCreationFunc channel_creation_func = + std::bind(grpc::CreateChannel, host_port, credentials); return std::shared_ptr<grpc::testing::InteropClient>( - new grpc::testing::InteropClient( - grpc::CreateChannel(host_port, credentials), true, false)); + new grpc::testing::InteropClient(channel_creation_func, true, false)); } extern "C" JNIEXPORT jboolean JNICALL diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 18e983d6f7..023281db97 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -181,7 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { } void PickFirst::ShutdownLocked() { - AutoChildRefsUpdater gaurd(this); + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p Shutting down", this); @@ -327,30 +327,10 @@ void PickFirst::FillChildRefsForChannelz( void PickFirst::UpdateChildRefsLocked() { ChildRefsList cs; if (subchannel_list_ != nullptr) { - for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { - if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - grpc_core::channelz::SubchannelNode* subchannel_node = - grpc_subchannel_get_channelz_node( - subchannel_list_->subchannel(i)->subchannel()); - if (subchannel_node != nullptr) { - cs.push_back(subchannel_node->subchannel_uuid()); - } - } - } + subchannel_list_->PopulateChildRefsList(&cs); } if (latest_pending_subchannel_list_ != nullptr) { - for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels(); - ++i) { - if (latest_pending_subchannel_list_->subchannel(i)->subchannel() != - nullptr) { - grpc_core::channelz::SubchannelNode* subchannel_node = - grpc_subchannel_get_channelz_node( - latest_pending_subchannel_list_->subchannel(i)->subchannel()); - if (subchannel_node != nullptr) { - cs.push_back(subchannel_node->subchannel_uuid()); - } - } - } + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); } // atomically update the data that channelz will actually be looking at. mu_guard guard(&child_refs_mu_); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 09634a2ad4..fc56a4961f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -69,9 +69,8 @@ class RoundRobin : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; - // TODO(ncteisen): implement this in a follow up PR void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override {} + ChildRefsList* ignored) override; private: ~RoundRobin(); @@ -183,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy { size_t last_ready_index_ = -1; // Index into list of last pick. }; + // Helper class to ensure that any function that modifies the child refs + // data structures will update the channelz snapshot data structures before + // returning. + class AutoChildRefsUpdater { + public: + explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {} + ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); } + + private: + RoundRobin* rr_; + }; + void ShutdownLocked() override; void StartPickingLocked(); bool DoPickLocked(PickState* pick); void DrainPendingPicksLocked(); + void UpdateChildRefsLocked(); /** list of subchannels */ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; @@ -205,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; + /// Lock and data used to capture snapshots of this channel's child + /// channels and subchannels. This data is consumed by channelz. + gpr_mu child_refs_mu_; + ChildRefsList child_subchannels_; + ChildRefsList child_channels_; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); + gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "round_robin"); UpdateLocked(*args.args); @@ -223,6 +241,7 @@ RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); @@ -242,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { } void RoundRobin::ShutdownLocked() { + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Shutting down", this); @@ -365,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } +void RoundRobin::FillChildRefsForChannelz( + ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { + mu_guard guard(&child_refs_mu_); + for (size_t i = 0; i < child_subchannels_.size(); ++i) { + // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might + // have to implement lightweight set. For now, we don't care about + // performance when channelz requests are made. + bool found = false; + for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { + if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { + found = true; + break; + } + } + if (!found) { + child_subchannels_to_fill->push_back(child_subchannels_[i]); + } + } +} + +void RoundRobin::UpdateChildRefsLocked() { + ChildRefsList cs; + if (subchannel_list_ != nullptr) { + subchannel_list_->PopulateChildRefsList(&cs); + } + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); + } + // atomically update the data that channelz will actually be looking at. + mu_guard guard(&child_refs_mu_); + child_subchannels_ = std::move(cs); +} + void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any @@ -455,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList:: void RoundRobin::RoundRobinSubchannelList:: UpdateRoundRobinStateFromSubchannelStateCountsLocked() { RoundRobin* p = static_cast<RoundRobin*>(policy()); + AutoChildRefsUpdater guard(p); if (num_ready_ > 0) { if (p->subchannel_list_.get() != this) { // Promote this list to p->subchannel_list_. @@ -611,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate, void RoundRobin::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); + AutoChildRefsUpdater guard(this); if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 7e2046bcdc..018ac3bb86 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -189,6 +189,19 @@ class SubchannelList // Returns true if the subchannel list is shutting down. bool shutting_down() const { return shutting_down_; } + // Populates refs_list with the uuids of this SubchannelLists's subchannels. + void PopulateChildRefsList(ChildRefsList* refs_list) { + for (size_t i = 0; i < subchannels_.size(); ++i) { + if (subchannels_[i].subchannel() != nullptr) { + grpc_core::channelz::SubchannelNode* subchannel_node = + grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); + if (subchannel_node != nullptr) { + refs_list->push_back(subchannel_node->subchannel_uuid()); + } + } + } + } + // Accessors. LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc index 5b6b79fa91..085fea40a4 100644 --- a/src/core/lib/iomgr/lockfree_event.cc +++ b/src/core/lib/iomgr/lockfree_event.cc @@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() { void LockfreeEvent::NotifyOn(grpc_closure* closure) { while (true) { - gpr_atm curr = gpr_atm_no_barrier_load(&state_); + /* This load needs to be an acquire load because this can be a shutdown + * error that we might need to reference. Adding acquire semantics makes + * sure that the shutdown error has been initialized properly before us + * referencing it. */ + gpr_atm curr = gpr_atm_acq_load(&state_); if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this, (void*)curr, closure); |