diff options
author | Noah Eisen <ncteisen@gmail.com> | 2018-07-18 07:34:24 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-18 07:34:24 -0700 |
commit | a9f3d78c6ef4897816f696366814967fd5db2ad6 (patch) | |
tree | 5d56c5b7490b1b609f64fc01da46662362720526 /src/core/ext/filters/client_channel | |
parent | 3904de99b8539acd0957d5e5f65e1d5531fefdaf (diff) | |
parent | 2968bf687af0f5e0db591d20276b79a7fd627c31 (diff) |
Merge pull request #15980 from ncteisen/channelz-subchannel-refs
Channelz Part 3: Subchannel Refs Support for PickFirst
Diffstat (limited to 'src/core/ext/filters/client_channel')
10 files changed, 178 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 04f7a2c830..024c9d737e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -3174,6 +3174,16 @@ static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); } +void grpc_client_channel_populate_child_refs( + grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels, + grpc_core::ChildRefsList* child_channels) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + if (chand->lb_policy != nullptr) { + chand->lb_policy->FillChildRefsForChannelz(child_subchannels, + child_channels); + } +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index a21e5623a7..0b44a17562 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -21,6 +21,7 @@ #include <grpc/support/port_platform.h> +#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" @@ -39,6 +40,10 @@ extern grpc_core::TraceFlag grpc_client_channel_trace; extern const grpc_channel_filter grpc_client_channel_filter; +void grpc_client_channel_populate_child_refs( + grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels, + grpc_core::ChildRefsList* child_channels); + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect); diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 08ceb2dd05..235b8f3207 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -63,6 +63,37 @@ void ClientChannelNode::PopulateConnectivityState(grpc_json* json) { false); } +void ClientChannelNode::PopulateChildRefs(grpc_json* json) { + ChildRefsList child_subchannels; + ChildRefsList child_channels; + grpc_json* json_iterator = nullptr; + grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels, + &child_channels); + if (child_subchannels.size() > 0) { + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false); + for (size_t i = 0; i < child_subchannels.size(); ++i) { + json_iterator = + grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId", + child_subchannels[i]); + } + } + if (child_channels.size() > 0) { + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false); + json_iterator = nullptr; + for (size_t i = 0; i < child_subchannels.size(); ++i) { + json_iterator = + grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_add_number_string_child(json_iterator, nullptr, "channelId", + child_subchannels[i]); + } + } +} + grpc_arg ClientChannelNode::CreateChannelArg() { return grpc_channel_arg_pointer_create( const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC), diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index cf3ef7b6f2..0547109d36 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -22,9 +22,17 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/gprpp/inlined_vector.h" namespace grpc_core { + +// TODO(ncteisen), this only contains the uuids of the children for now, +// since that is all that is strictly needed. In a future enhancement we will +// add human readable names as in the channelz.proto +typedef InlinedVector<intptr_t, 10> ChildRefsList; + namespace channelz { // Subtype of ChannelNode that overrides and provides client_channel specific @@ -38,6 +46,9 @@ class ClientChannelNode : public ChannelNode { // channel connectivity. void PopulateConnectivityState(grpc_json* json) override; + // Override this functionality since client_channels have subchannels + void PopulateChildRefs(grpc_json* json) override; + // Helper to create a channel arg to ensure this type of ChannelNode is // created. static grpc_arg CreateChannelArg(); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index dab4466b21..3150df8847 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -21,6 +21,7 @@ #include <grpc/support/port_platform.h> +#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/gprpp/abstract.h" @@ -143,6 +144,14 @@ class LoadBalancingPolicy /// consider whether this method is still needed. virtual void ExitIdleLocked() GRPC_ABSTRACT; + /// populates child_subchannels and child_channels with the uuids of this + /// LB policy's referenced children. This is not invoked from the + /// client_channel's combiner. The implementation is responsible for + /// providing its own synchronization. + virtual void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) + GRPC_ABSTRACT; + void Orphan() override { // Invoke ShutdownAndUnrefLocked() inside of the combiner. GRPC_CLOSURE_SCHED( @@ -196,6 +205,12 @@ class LoadBalancingPolicy grpc_pollset_set* interested_parties_; /// Callback to force a re-resolution. grpc_closure* request_reresolution_; + + // Dummy classes needed for alignment issues. + // See https://github.com/grpc/grpc/issues/16032 for context. + // TODO(ncteisen): remove this as soon as the issue is resolved. + ChildRefsList dummy_list_foo; + ChildRefsList dummy_list_bar; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 263b51ae89..f757d6057c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -135,6 +135,9 @@ class GrpcLb : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + // TODO(ncteisen): implement this in a follow up PR + void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) override {} private: /// Linked list of pending pick requests. It stores all information needed to diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index ff2140e628..c50deb9679 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -58,6 +58,8 @@ class PickFirst : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) override; private: ~PickFirst(); @@ -103,10 +105,23 @@ class PickFirst : public LoadBalancingPolicy { } }; + // Helper class to ensure that any function that modifies the child refs + // data structures will update the channelz snapshot data structures before + // returning. + class AutoChildRefsUpdater { + public: + explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {} + ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); } + + private: + PickFirst* pf_; + }; + void ShutdownLocked() override; void StartPickingLocked(); void DestroyUnselectedSubchannelsLocked(); + void UpdateChildRefsLocked(); // All our subchannels. OrphanablePtr<PickFirstSubchannelList> subchannel_list_; @@ -122,10 +137,17 @@ class PickFirst : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; // Our connectivity state tracker. grpc_connectivity_state_tracker state_tracker_; + + /// Lock and data used to capture snapshots of this channels child + /// channels and subchannels. This data is consumed by channelz. + gpr_mu child_refs_mu_; + ChildRefsList child_subchannels_; + ChildRefsList child_channels_; }; PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); + gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "pick_first"); if (grpc_lb_pick_first_trace.enabled()) { @@ -139,6 +161,7 @@ PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Destroying Pick First %p", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); @@ -158,6 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { } void PickFirst::ShutdownLocked() { + AutoChildRefsUpdater(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p Shutting down", this); @@ -280,7 +304,61 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } } +void PickFirst::FillChildRefsForChannelz( + ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { + mu_guard guard(&child_refs_mu_); + for (size_t i = 0; i < child_subchannels_.size(); ++i) { + // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might + // have to implement lightweight set. For now, we don't care about + // performance when channelz requests are made. + bool found = false; + for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { + if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { + found = true; + break; + } + } + if (!found) { + child_subchannels_to_fill->push_back(child_subchannels_[i]); + } + } +} + +void PickFirst::UpdateChildRefsLocked() { + ChildRefsList cs; + if (subchannel_list_ != nullptr) { + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { + grpc_core::channelz::SubchannelNode* subchannel_node = + grpc_subchannel_get_channelz_node( + subchannel_list_->subchannel(i)->subchannel()); + if (subchannel_node != nullptr) { + cs.push_back(subchannel_node->subchannel_uuid()); + } + } + } + } + if (latest_pending_subchannel_list_ != nullptr) { + for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels(); + ++i) { + if (latest_pending_subchannel_list_->subchannel(i)->subchannel() != + nullptr) { + grpc_core::channelz::SubchannelNode* subchannel_node = + grpc_subchannel_get_channelz_node( + latest_pending_subchannel_list_->subchannel(i)->subchannel()); + if (subchannel_node != nullptr) { + cs.push_back(subchannel_node->subchannel_uuid()); + } + } + } + } + // atomically update the data that channelz will actually be looking at. + mu_guard guard(&child_refs_mu_); + child_subchannels_ = std::move(cs); +} + void PickFirst::UpdateLocked(const grpc_channel_args& args) { + AutoChildRefsUpdater guard(this); const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { if (subchannel_list_ == nullptr) { @@ -388,6 +466,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) { PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + AutoChildRefsUpdater guard(p); // The notification must be for a subchannel in either the current or // latest pending subchannel lists. GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 42e8e88ec9..09634a2ad4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -69,6 +69,9 @@ class RoundRobin : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + // TODO(ncteisen): implement this in a follow up PR + void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) override {} private: ~RoundRobin(); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 8ab3fe40f5..9d608c3c55 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -134,6 +134,9 @@ struct grpc_subchannel { bool backoff_begun; /** our alarm */ grpc_timer alarm; + + grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> + channelz_subchannel; }; struct grpc_subchannel_call { @@ -178,6 +181,7 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); + c->channelz_subchannel.reset(); gpr_free((void*)c->filters); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -374,9 +378,22 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); + const grpc_arg* arg = + grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); + bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + if (channelz_enabled) { + c->channelz_subchannel = + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + } + return grpc_subchannel_index_register(key, c); } +grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( + grpc_subchannel* s) { + return s->channelz_subchannel.get(); +} + static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e23aec12df..9e53f7d542 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -21,6 +21,7 @@ #include <grpc/support/port_platform.h> +#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/arena.h" @@ -115,6 +116,9 @@ grpc_subchannel_call* grpc_subchannel_call_ref( void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( + grpc_subchannel* subchannel); + /** Returns a pointer to the parent data associated with \a subchannel_call. The data will be of the size specified in \a parent_data_size field of the args passed to \a grpc_connected_subchannel_create_call(). */ |