aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
authorGravatar Noah Eisen <ncteisen@gmail.com>2018-07-18 07:34:24 -0700
committerGravatar GitHub <noreply@github.com>2018-07-18 07:34:24 -0700
commita9f3d78c6ef4897816f696366814967fd5db2ad6 (patch)
tree5d56c5b7490b1b609f64fc01da46662362720526 /src/core/ext/filters/client_channel
parent3904de99b8539acd0957d5e5f65e1d5531fefdaf (diff)
parent2968bf687af0f5e0db591d20276b79a7fd627c31 (diff)
Merge pull request #15980 from ncteisen/channelz-subchannel-refs
Channelz Part 3: Subchannel Refs Support for PickFirst
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h5
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc31
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.h11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h15
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc79
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc3
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc17
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h4
10 files changed, 178 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 04f7a2c830..024c9d737e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -3174,6 +3174,16 @@ static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
}
+void grpc_client_channel_populate_child_refs(
+ grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
+ grpc_core::ChildRefsList* child_channels) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ if (chand->lb_policy != nullptr) {
+ chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
+ child_channels);
+ }
+}
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index a21e5623a7..0b44a17562 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -39,6 +40,10 @@ extern grpc_core::TraceFlag grpc_client_channel_trace;
extern const grpc_channel_filter grpc_client_channel_filter;
+void grpc_client_channel_populate_child_refs(
+ grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
+ grpc_core::ChildRefsList* child_channels);
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect);
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 08ceb2dd05..235b8f3207 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -63,6 +63,37 @@ void ClientChannelNode::PopulateConnectivityState(grpc_json* json) {
false);
}
+void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
+ ChildRefsList child_subchannels;
+ ChildRefsList child_channels;
+ grpc_json* json_iterator = nullptr;
+ grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels,
+ &child_channels);
+ if (child_subchannels.size() > 0) {
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < child_subchannels.size(); ++i) {
+ json_iterator =
+ grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId",
+ child_subchannels[i]);
+ }
+ }
+ if (child_channels.size() > 0) {
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
+ json_iterator = nullptr;
+ for (size_t i = 0; i < child_subchannels.size(); ++i) {
+ json_iterator =
+ grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
+ child_subchannels[i]);
+ }
+ }
+}
+
grpc_arg ClientChannelNode::CreateChannelArg() {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC),
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h
index cf3ef7b6f2..0547109d36 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.h
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.h
@@ -22,9 +22,17 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
namespace grpc_core {
+
+// TODO(ncteisen), this only contains the uuids of the children for now,
+// since that is all that is strictly needed. In a future enhancement we will
+// add human readable names as in the channelz.proto
+typedef InlinedVector<intptr_t, 10> ChildRefsList;
+
namespace channelz {
// Subtype of ChannelNode that overrides and provides client_channel specific
@@ -38,6 +46,9 @@ class ClientChannelNode : public ChannelNode {
// channel connectivity.
void PopulateConnectivityState(grpc_json* json) override;
+ // Override this functionality since client_channels have subchannels
+ void PopulateChildRefs(grpc_json* json) override;
+
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index dab4466b21..3150df8847 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/gprpp/abstract.h"
@@ -143,6 +144,14 @@ class LoadBalancingPolicy
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
+ /// populates child_subchannels and child_channels with the uuids of this
+ /// LB policy's referenced children. This is not invoked from the
+ /// client_channel's combiner. The implementation is responsible for
+ /// providing its own synchronization.
+ virtual void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels)
+ GRPC_ABSTRACT;
+
void Orphan() override {
// Invoke ShutdownAndUnrefLocked() inside of the combiner.
GRPC_CLOSURE_SCHED(
@@ -196,6 +205,12 @@ class LoadBalancingPolicy
grpc_pollset_set* interested_parties_;
/// Callback to force a re-resolution.
grpc_closure* request_reresolution_;
+
+ // Dummy classes needed for alignment issues.
+ // See https://github.com/grpc/grpc/issues/16032 for context.
+ // TODO(ncteisen): remove this as soon as the issue is resolved.
+ ChildRefsList dummy_list_foo;
+ ChildRefsList dummy_list_bar;
};
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 263b51ae89..f757d6057c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -135,6 +135,9 @@ class GrpcLb : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ // TODO(ncteisen): implement this in a follow up PR
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) override {}
private:
/// Linked list of pending pick requests. It stores all information needed to
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index ff2140e628..c50deb9679 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -58,6 +58,8 @@ class PickFirst : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) override;
private:
~PickFirst();
@@ -103,10 +105,23 @@ class PickFirst : public LoadBalancingPolicy {
}
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
+ ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
+
+ private:
+ PickFirst* pf_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
+ void UpdateChildRefsLocked();
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
@@ -122,10 +137,17 @@ class PickFirst : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
// Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
+
+ /// Lock and data used to capture snapshots of this channels child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
@@ -139,6 +161,7 @@ PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -158,6 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void PickFirst::ShutdownLocked() {
+ AutoChildRefsUpdater(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@@ -280,7 +304,61 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
+void PickFirst::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void PickFirst::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ grpc_core::channelz::SubchannelNode* subchannel_node =
+ grpc_subchannel_get_channelz_node(
+ subchannel_list_->subchannel(i)->subchannel());
+ if (subchannel_node != nullptr) {
+ cs.push_back(subchannel_node->subchannel_uuid());
+ }
+ }
+ }
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
+ ++i) {
+ if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
+ nullptr) {
+ grpc_core::channelz::SubchannelNode* subchannel_node =
+ grpc_subchannel_get_channelz_node(
+ latest_pending_subchannel_list_->subchannel(i)->subchannel());
+ if (subchannel_node != nullptr) {
+ cs.push_back(subchannel_node->subchannel_uuid());
+ }
+ }
+ }
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
+ AutoChildRefsUpdater guard(this);
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
if (subchannel_list_ == nullptr) {
@@ -388,6 +466,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ AutoChildRefsUpdater guard(p);
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 42e8e88ec9..09634a2ad4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -69,6 +69,9 @@ class RoundRobin : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ // TODO(ncteisen): implement this in a follow up PR
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) override {}
private:
~RoundRobin();
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 8ab3fe40f5..9d608c3c55 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -134,6 +134,9 @@ struct grpc_subchannel {
bool backoff_begun;
/** our alarm */
grpc_timer alarm;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
+ channelz_subchannel;
};
struct grpc_subchannel_call {
@@ -178,6 +181,7 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
+ c->channelz_subchannel.reset();
gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@@ -374,9 +378,22 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
+ const grpc_arg* arg =
+ grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
+ bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
+ if (channelz_enabled) {
+ c->channelz_subchannel =
+ grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>();
+ }
+
return grpc_subchannel_index_register(key, c);
}
+grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
+ grpc_subchannel* s) {
+ return s->channelz_subchannel.get();
+}
+
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index e23aec12df..9e53f7d542 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/arena.h"
@@ -115,6 +116,9 @@ grpc_subchannel_call* grpc_subchannel_call_ref(
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
+ grpc_subchannel* subchannel);
+
/** Returns a pointer to the parent data associated with \a subchannel_call.
The data will be of the size specified in \a parent_data_size
field of the args passed to \a grpc_connected_subchannel_create_call(). */