aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h5
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc41
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.h17
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h15
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc29
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc59
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc58
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h13
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc17
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h4
-rw-r--r--src/core/lib/channel/channel_trace.cc46
-rw-r--r--src/core/lib/channel/channel_trace.h2
-rw-r--r--src/core/lib/channel/channelz.cc126
-rw-r--r--src/core/lib/channel/channelz.h37
-rw-r--r--src/core/lib/channel/channelz_registry.cc92
-rw-r--r--src/core/lib/channel/channelz_registry.h95
-rw-r--r--src/core/lib/gpr/string.cc28
-rw-r--r--src/core/lib/gpr/string.h10
-rw-r--r--src/core/lib/gprpp/abstract.h7
-rw-r--r--src/core/lib/gprpp/inlined_vector.h58
-rw-r--r--src/core/lib/iomgr/lockfree_event.cc6
-rw-r--r--src/core/lib/json/json.cc13
-rw-r--r--src/core/lib/json/json.h5
-rw-r--r--src/core/lib/surface/channel.cc9
-rw-r--r--src/core/lib/surface/init.cc4
26 files changed, 628 insertions, 178 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 04f7a2c830..024c9d737e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -3174,6 +3174,16 @@ static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
}
+void grpc_client_channel_populate_child_refs(
+ grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
+ grpc_core::ChildRefsList* child_channels) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ if (chand->lb_policy != nullptr) {
+ chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
+ child_channels);
+ }
+}
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index a21e5623a7..0b44a17562 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -39,6 +40,10 @@ extern grpc_core::TraceFlag grpc_client_channel_trace;
extern const grpc_channel_filter grpc_client_channel_filter;
+void grpc_client_channel_populate_child_refs(
+ grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
+ grpc_core::ChildRefsList* child_channels);
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect);
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 08ceb2dd05..4c9c9a6bd6 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -41,8 +41,9 @@ static const grpc_arg_pointer_vtable client_channel_channelz_vtable = {
client_channel_channelz_cmp};
ClientChannelNode::ClientChannelNode(grpc_channel* channel,
- size_t channel_tracer_max_nodes)
- : ChannelNode(channel, channel_tracer_max_nodes) {
+ size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) {
client_channel_ =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter);
@@ -63,6 +64,37 @@ void ClientChannelNode::PopulateConnectivityState(grpc_json* json) {
false);
}
+void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
+ ChildRefsList child_subchannels;
+ ChildRefsList child_channels;
+ grpc_json* json_iterator = nullptr;
+ grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels,
+ &child_channels);
+ if (!child_subchannels.empty()) {
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < child_subchannels.size(); ++i) {
+ json_iterator =
+ grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId",
+ child_subchannels[i]);
+ }
+ }
+ if (!child_channels.empty()) {
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
+ json_iterator = nullptr;
+ for (size_t i = 0; i < child_channels.size(); ++i) {
+ json_iterator =
+ grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
+ child_channels[i]);
+ }
+ }
+}
+
grpc_arg ClientChannelNode::CreateChannelArg() {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC),
@@ -71,9 +103,10 @@ grpc_arg ClientChannelNode::CreateChannelArg() {
}
RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes) {
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel) {
return MakePolymorphicRefCounted<ChannelNode, ClientChannelNode>(
- channel, channel_tracer_max_nodes);
+ channel, channel_tracer_max_nodes, is_top_level_channel);
}
} // namespace channelz
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h
index cf3ef7b6f2..6f27b5c8b7 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.h
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.h
@@ -22,9 +22,17 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
namespace grpc_core {
+
+// TODO(ncteisen), this only contains the uuids of the children for now,
+// since that is all that is strictly needed. In a future enhancement we will
+// add human readable names as in the channelz.proto
+typedef InlinedVector<intptr_t, 10> ChildRefsList;
+
namespace channelz {
// Subtype of ChannelNode that overrides and provides client_channel specific
@@ -32,12 +40,16 @@ namespace channelz {
class ClientChannelNode : public ChannelNode {
public:
static RefCountedPtr<ChannelNode> MakeClientChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes);
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
// Override this functionality since client_channels have a notion of
// channel connectivity.
void PopulateConnectivityState(grpc_json* json) override;
+ // Override this functionality since client_channels have subchannels
+ void PopulateChildRefs(grpc_json* json) override;
+
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
@@ -45,7 +57,8 @@ class ClientChannelNode : public ChannelNode {
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes);
+ ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
virtual ~ClientChannelNode() {}
private:
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index dab4466b21..3150df8847 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/gprpp/abstract.h"
@@ -143,6 +144,14 @@ class LoadBalancingPolicy
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
+ /// populates child_subchannels and child_channels with the uuids of this
+ /// LB policy's referenced children. This is not invoked from the
+ /// client_channel's combiner. The implementation is responsible for
+ /// providing its own synchronization.
+ virtual void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels)
+ GRPC_ABSTRACT;
+
void Orphan() override {
// Invoke ShutdownAndUnrefLocked() inside of the combiner.
GRPC_CLOSURE_SCHED(
@@ -196,6 +205,12 @@ class LoadBalancingPolicy
grpc_pollset_set* interested_parties_;
/// Callback to force a re-resolution.
grpc_closure* request_reresolution_;
+
+ // Dummy classes needed for alignment issues.
+ // See https://github.com/grpc/grpc/issues/16032 for context.
+ // TODO(ncteisen): remove this as soon as the issue is resolved.
+ ChildRefsList dummy_list_foo;
+ ChildRefsList dummy_list_bar;
};
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 263b51ae89..959c7441a3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -135,6 +135,8 @@ class GrpcLb : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) override;
private:
/// Linked list of pending pick requests. It stores all information needed to
@@ -298,6 +300,9 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
+ // Mutex to protect the channel to the LB server. This is used when
+ // processing a channelz request.
+ gpr_mu lb_channel_mu_;
grpc_connectivity_state lb_channel_connectivity_;
grpc_closure lb_channel_on_connectivity_changed_;
// Are we already watching the LB channel's connectivity?
@@ -1004,6 +1009,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
// A channel arg indicating the target is a grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
+ // A channel arg indicating this is an internal channels, aka it is
+ // owned by components in Core, not by the user application.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
};
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
@@ -1033,6 +1042,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
+ gpr_mu_init(&lb_channel_mu_);
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
@@ -1071,6 +1081,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr);
GPR_ASSERT(pending_pings_ == nullptr);
+ gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
grpc_connectivity_state_destroy(&state_tracker_);
@@ -1100,8 +1111,10 @@ void GrpcLb::ShutdownLocked() {
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
+ gpr_mu_lock(&lb_channel_mu_);
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
+ gpr_mu_unlock(&lb_channel_mu_);
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "grpclb_shutdown");
@@ -1272,6 +1285,20 @@ void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
+void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) {
+ // delegate to the RoundRobin to fill the children subchannels.
+ rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
+ mu_guard guard(&lb_channel_mu_);
+ if (lb_channel_ != nullptr) {
+ grpc_core::channelz::ChannelNode* channel_node =
+ grpc_channel_get_channelz_node(lb_channel_);
+ if (channel_node != nullptr) {
+ child_channels->push_back(channel_node->channel_uuid());
+ }
+ }
+}
+
grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
grpc_error** connectivity_error) {
return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
@@ -1315,9 +1342,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) {
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+ gpr_mu_lock(&lb_channel_mu_);
lb_channel_ = grpc_client_channel_factory_create_channel(
client_channel_factory(), uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
+ gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index ff2140e628..023281db97 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -58,6 +58,8 @@ class PickFirst : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* ignored) override;
private:
~PickFirst();
@@ -103,10 +105,23 @@ class PickFirst : public LoadBalancingPolicy {
}
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
+ ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
+
+ private:
+ PickFirst* pf_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
+ void UpdateChildRefsLocked();
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
@@ -122,10 +137,17 @@ class PickFirst : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
// Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
+
+ /// Lock and data used to capture snapshots of this channels child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
@@ -139,6 +161,7 @@ PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -158,6 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void PickFirst::ShutdownLocked() {
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@@ -280,7 +304,41 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
+void PickFirst::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void PickFirst::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
+ AutoChildRefsUpdater guard(this);
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
if (subchannel_list_ == nullptr) {
@@ -388,6 +446,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ AutoChildRefsUpdater guard(p);
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 42e8e88ec9..fc56a4961f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -69,6 +69,8 @@ class RoundRobin : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* ignored) override;
private:
~RoundRobin();
@@ -180,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
+ ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
+
+ private:
+ RoundRobin* rr_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
+ void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@@ -202,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
+ /// Lock and data used to capture snapshots of this channel's child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
@@ -220,6 +241,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -239,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void RoundRobin::ShutdownLocked() {
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@@ -362,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
+void RoundRobin::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void RoundRobin::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@@ -452,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
+ AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@@ -608,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 7e2046bcdc..018ac3bb86 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -189,6 +189,19 @@ class SubchannelList
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
+ // Populates refs_list with the uuids of this SubchannelLists's subchannels.
+ void PopulateChildRefsList(ChildRefsList* refs_list) {
+ for (size_t i = 0; i < subchannels_.size(); ++i) {
+ if (subchannels_[i].subchannel() != nullptr) {
+ grpc_core::channelz::SubchannelNode* subchannel_node =
+ grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
+ if (subchannel_node != nullptr) {
+ refs_list->push_back(subchannel_node->subchannel_uuid());
+ }
+ }
+ }
+ }
+
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 8ab3fe40f5..93df2aff70 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -134,6 +134,9 @@ struct grpc_subchannel {
bool backoff_begun;
/** our alarm */
grpc_timer alarm;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
+ channelz_subchannel;
};
struct grpc_subchannel_call {
@@ -178,6 +181,7 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
+ c->channelz_subchannel.reset();
gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@@ -374,9 +378,22 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
+ const grpc_arg* arg =
+ grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
+ bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
+ if (channelz_enabled) {
+ c->channelz_subchannel =
+ grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>();
+ }
+
return grpc_subchannel_index_register(key, c);
}
+grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
+ grpc_subchannel* subchannel) {
+ return subchannel->channelz_subchannel.get();
+}
+
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index e23aec12df..9e53f7d542 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/arena.h"
@@ -115,6 +116,9 @@ grpc_subchannel_call* grpc_subchannel_call_ref(
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
+ grpc_subchannel* subchannel);
+
/** Returns a pointer to the parent data associated with \a subchannel_call.
The data will be of the size specified in \a parent_data_size
field of the args passed to \a grpc_connected_subchannel_create_call(). */
diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc
index 0f655d8716..b3443310ac 100644
--- a/src/core/lib/channel/channel_trace.cc
+++ b/src/core/lib/channel/channel_trace.cc
@@ -131,38 +131,6 @@ void ChannelTrace::AddTraceEventReferencingSubchannel(
namespace {
-// returns an allocated string that represents tm according to RFC-3339, and,
-// more specifically, follows:
-// https://developers.google.com/protocol-buffers/docs/proto3#json
-//
-// "Uses RFC 3339, where generated output will always be Z-normalized and uses
-// 0, 3, 6 or 9 fractional digits."
-char* fmt_time(gpr_timespec tm) {
- char time_buffer[35];
- char ns_buffer[11]; // '.' + 9 digits of precision
- struct tm* tm_info = localtime((const time_t*)&tm.tv_sec);
- strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info);
- snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec);
- // This loop trims off trailing zeros by inserting a null character that the
- // right point. We iterate in chunks of three because we want 0, 3, 6, or 9
- // fractional digits.
- for (int i = 7; i >= 1; i -= 3) {
- if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' &&
- ns_buffer[i + 2] == '0') {
- ns_buffer[i] = '\0';
- // Edge case in which all fractional digits were 0.
- if (i == 1) {
- ns_buffer[0] = '\0';
- }
- } else {
- break;
- }
- }
- char* full_time_str;
- gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer);
- return full_time_str;
-}
-
const char* severity_string(ChannelTrace::Severity severity) {
switch (severity) {
case ChannelTrace::Severity::Info:
@@ -186,9 +154,9 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
json_iterator = grpc_json_create_child(json_iterator, json, "severity",
severity_string(severity_),
GRPC_JSON_STRING, false);
- json_iterator =
- grpc_json_create_child(json_iterator, json, "timestamp",
- fmt_time(timestamp_), GRPC_JSON_STRING, true);
+ json_iterator = grpc_json_create_child(json_iterator, json, "timestamp",
+ gpr_format_timespec(timestamp_),
+ GRPC_JSON_STRING, true);
if (referenced_channel_ != nullptr) {
char* uuid_str;
gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid());
@@ -206,7 +174,7 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
}
}
-grpc_json* ChannelTrace::RenderJSON() const {
+grpc_json* ChannelTrace::RenderJson() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
@@ -216,9 +184,9 @@ grpc_json* ChannelTrace::RenderJSON() const {
json_iterator =
grpc_json_create_child(json_iterator, json, "numEventsLogged",
num_events_logged_str, GRPC_JSON_STRING, true);
- json_iterator =
- grpc_json_create_child(json_iterator, json, "creationTimestamp",
- fmt_time(time_created_), GRPC_JSON_STRING, true);
+ json_iterator = grpc_json_create_child(
+ json_iterator, json, "creationTimestamp",
+ gpr_format_timespec(time_created_), GRPC_JSON_STRING, true);
grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h
index 0dd162a777..596af7402f 100644
--- a/src/core/lib/channel/channel_trace.h
+++ b/src/core/lib/channel/channel_trace.h
@@ -71,7 +71,7 @@ class ChannelTrace {
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
- grpc_json* RenderJSON() const;
+ grpc_json* RenderJson() const;
private:
// Types of objects that can be references by trace events.
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 2074cb0cc5..9d6002ed8a 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -41,65 +41,22 @@
namespace grpc_core {
namespace channelz {
-namespace {
-
-// TODO(ncteisen): move this function to a common helper location.
-//
-// returns an allocated string that represents tm according to RFC-3339, and,
-// more specifically, follows:
-// https://developers.google.com/protocol-buffers/docs/proto3#json
-//
-// "Uses RFC 3339, where generated output will always be Z-normalized and uses
-// 0, 3, 6 or 9 fractional digits."
-char* fmt_time(gpr_timespec tm) {
- char time_buffer[35];
- char ns_buffer[11]; // '.' + 9 digits of precision
- struct tm* tm_info = localtime((const time_t*)&tm.tv_sec);
- strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info);
- snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec);
- // This loop trims off trailing zeros by inserting a null character that the
- // right point. We iterate in chunks of three because we want 0, 3, 6, or 9
- // fractional digits.
- for (int i = 7; i >= 1; i -= 3) {
- if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' &&
- ns_buffer[i + 2] == '0') {
- ns_buffer[i] = '\0';
- // Edge case in which all fractional digits were 0.
- if (i == 1) {
- ns_buffer[0] = '\0';
- }
- } else {
- break;
- }
- }
- char* full_time_str;
- gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer);
- return full_time_str;
-}
-
-// TODO(ncteisen); move this to json library
-grpc_json* add_num_str(grpc_json* parent, grpc_json* it, const char* name,
- int64_t num) {
- char* num_str;
- gpr_asprintf(&num_str, "%" PRId64, num);
- return grpc_json_create_child(it, parent, name, num_str, GRPC_JSON_STRING,
- true);
-}
-
-} // namespace
-
-ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes)
- : channel_(channel), target_(nullptr), channel_uuid_(-1) {
+ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : channel_(channel),
+ target_(nullptr),
+ channel_uuid_(-1),
+ is_top_level_channel_(is_top_level_channel) {
trace_.Init(channel_tracer_max_nodes);
target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
- channel_uuid_ = ChannelzRegistry::Register(this);
+ channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
ChannelNode::~ChannelNode() {
trace_.Destroy();
- ChannelzRegistry::Unregister(channel_uuid_);
+ ChannelzRegistry::UnregisterChannelNode(channel_uuid_);
}
void ChannelNode::RecordCallStarted() {
@@ -110,7 +67,9 @@ void ChannelNode::RecordCallStarted() {
void ChannelNode::PopulateConnectivityState(grpc_json* json) {}
-char* ChannelNode::RenderJSON() {
+void ChannelNode::PopulateChildRefs(grpc_json* json) {}
+
+grpc_json* ChannelNode::RenderJson() {
// We need to track these three json objects to build our object
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
@@ -120,7 +79,8 @@ char* ChannelNode::RenderJSON() {
GRPC_JSON_OBJECT, false);
json = json_iterator;
json_iterator = nullptr;
- json_iterator = add_num_str(json, json_iterator, "channelId", channel_uuid_);
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "channelId", channel_uuid_);
// reset json iterators to top level object
json = top_level_json;
json_iterator = nullptr;
@@ -130,45 +90,63 @@ char* ChannelNode::RenderJSON() {
json = data;
json_iterator = nullptr;
PopulateConnectivityState(json);
+ GPR_ASSERT(target_.get() != nullptr);
json_iterator = grpc_json_create_child(
json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
// fill in the channel trace if applicable
- grpc_json* trace = trace_->RenderJSON();
+ grpc_json* trace = trace_->RenderJson();
if (trace != nullptr) {
- // we manuall link up and fill the child since it was created for us in
- // ChannelTrace::RenderJSON
+ // we manually link up and fill the child since it was created for us in
+ // ChannelTrace::RenderJson
+ trace->key = "trace"; // this object is named trace in channelz.proto
json_iterator = grpc_json_link_child(json, trace, json_iterator);
- trace->parent = json;
- trace->value = nullptr;
- trace->key = "trace";
- trace->owns_value = false;
}
// reset the parent to be the data object.
json = data;
json_iterator = nullptr;
- // We use -1 as sentinel values since proto default value for integers is
- // zero, and the confuses the parser into thinking the value weren't present
- json_iterator =
- add_num_str(json, json_iterator, "callsStarted", calls_started_);
- json_iterator =
- add_num_str(json, json_iterator, "callsSucceeded", calls_succeeded_);
- json_iterator =
- add_num_str(json, json_iterator, "callsFailed", calls_failed_);
+ if (calls_started_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsStarted", calls_started_);
+ }
+ if (calls_succeeded_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsSucceeded", calls_succeeded_);
+ }
+ if (calls_failed_) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsFailed", calls_failed_);
+ }
gpr_timespec ts =
grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
- fmt_time(ts), GRPC_JSON_STRING, true);
- // render and return the over json object
- char* json_str = grpc_json_dump_to_string(top_level_json, 0);
- grpc_json_destroy(top_level_json);
+ gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+ json = top_level_json;
+ json_iterator = nullptr;
+ PopulateChildRefs(json);
+ return top_level_json;
+}
+
+char* ChannelNode::RenderJsonString() {
+ grpc_json* json = RenderJson();
+ char* json_str = grpc_json_dump_to_string(json, 0);
+ grpc_json_destroy(json);
return json_str;
}
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes) {
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel) {
return MakeRefCounted<grpc_core::channelz::ChannelNode>(
- channel, channel_tracer_max_nodes);
+ channel, channel_tracer_max_nodes, is_top_level_channel);
+}
+
+SubchannelNode::SubchannelNode() {
+ subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this);
+}
+
+SubchannelNode::~SubchannelNode() {
+ ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_);
}
} // namespace channelz
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 9bd01ece50..07eb73d626 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -35,6 +35,10 @@
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC \
"grpc.channelz_channel_node_creation_func"
+// Channel arg key to signal that the channel is an internal channel.
+#define GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL \
+ "grpc.channelz_channel_is_internal_channel"
+
namespace grpc_core {
namespace channelz {
@@ -45,7 +49,8 @@ class ChannelNodePeer;
class ChannelNode : public RefCounted<ChannelNode> {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes);
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
void RecordCallStarted();
void RecordCallFailed() {
@@ -55,13 +60,16 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
- char* RenderJSON();
+ grpc_json* RenderJson();
+ char* RenderJsonString();
// helper for getting and populating connectivity state. It is virtual
// because it allows the client_channel specific code to live in ext/
// instead of lib/
virtual void PopulateConnectivityState(grpc_json* json);
+ virtual void PopulateChildRefs(grpc_json* json);
+
ChannelTrace* trace() { return trace_.get(); }
void MarkChannelDestroyed() {
@@ -72,11 +80,13 @@ class ChannelNode : public RefCounted<ChannelNode> {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
intptr_t channel_uuid() { return channel_uuid_; }
+ bool is_top_level_channel() { return is_top_level_channel_; }
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes);
+ ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
virtual ~ChannelNode();
private:
@@ -90,13 +100,32 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_ = 0;
intptr_t channel_uuid_;
+ bool is_top_level_channel_ = true;
ManualConstructor<ChannelTrace> trace_;
};
+// Placeholds channelz class for subchannels. All this can do now is track its
+// uuid (this information is needed by the parent channelz class).
+// TODO(ncteisen): build this out to support the GetSubchannel channelz request.
+class SubchannelNode : public RefCounted<SubchannelNode> {
+ public:
+ SubchannelNode();
+ virtual ~SubchannelNode();
+
+ intptr_t subchannel_uuid() { return subchannel_uuid_; }
+
+ protected:
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+
+ private:
+ intptr_t subchannel_uuid_;
+};
+
// Creation functions
typedef RefCountedPtr<ChannelNode> (*ChannelNodeCreationFunc)(grpc_channel*,
- size_t);
+ size_t, bool);
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 023ede552a..38496b3d78 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -19,16 +19,19 @@
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
#include <cstring>
namespace grpc_core {
+namespace channelz {
namespace {
// singleton instance of the registry.
@@ -49,12 +52,93 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
-void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
+intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) {
+ mu_guard guard(&mu_);
+ entities_.push_back(entry);
+ intptr_t uuid = entities_.size();
+ return uuid;
+}
+
+void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) {
GPR_ASSERT(uuid >= 1);
- gpr_mu_lock(&mu_);
+ mu_guard guard(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
- entities_[uuid - 1] = nullptr;
- gpr_mu_unlock(&mu_);
+ GPR_ASSERT(entities_[uuid - 1].type == type);
+ entities_[uuid - 1].object = nullptr;
+ entities_[uuid - 1].type = EntityType::kUnset;
+}
+
+void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) {
+ mu_guard guard(&mu_);
+ if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
+ return nullptr;
+ }
+ if (entities_[uuid - 1].type == type) {
+ return entities_[uuid - 1].object;
+ } else {
+ return nullptr;
+ }
+}
+
+char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ InlinedVector<ChannelNode*, 10> top_level_channels;
+ // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
+ // reserved). However, we want to support requests coming in with
+ // start_channel_id=0, which signifies "give me everything." Hence this
+ // funky looking line below.
+ size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1;
+ for (size_t i = start_idx; i < entities_.size(); ++i) {
+ if (entities_[i].type == EntityType::kChannelNode) {
+ ChannelNode* channel_node =
+ static_cast<ChannelNode*>(entities_[i].object);
+ if (channel_node->is_top_level_channel()) {
+ top_level_channels.push_back(channel_node);
+ }
+ }
+ }
+ if (top_level_channels.size() > 0) {
+ // create list of channels
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < top_level_channels.size(); ++i) {
+ grpc_json* channel_json = top_level_channels[i]->RenderJson();
+ json_iterator =
+ grpc_json_link_child(array_parent, channel_json, json_iterator);
+ }
+ }
+ // For now we do not have any pagination rules. In the future we could
+ // pick a constant for max_channels_sent for a GetTopChannels request.
+ // Tracking: https://github.com/grpc/grpc/issues/16019.
+ json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
+ GRPC_JSON_TRUE, false);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
}
+} // namespace channelz
} // namespace grpc_core
+
+char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
+ return grpc_core::channelz::ChannelzRegistry::GetTopChannels(
+ start_channel_id);
+}
+
+char* grpc_channelz_get_channel(intptr_t channel_id) {
+ grpc_core::channelz::ChannelNode* channel_node =
+ grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
+ if (channel_node == nullptr) {
+ return nullptr;
+ }
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* channel_json = channel_node->RenderJson();
+ channel_json->key = "channel";
+ grpc_json_link_child(json, channel_json, nullptr);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index a5a187a054..5d7c936726 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -22,11 +22,13 @@
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include <stdint.h>
namespace grpc_core {
+namespace channelz {
// singleton registry object to track all objects that are needed to support
// channelz bookkeeping. All objects share globally distributed uuids.
@@ -35,26 +37,56 @@ class ChannelzRegistry {
// To be called in grpc_init()
static void Init();
- // To be callen in grpc_shutdown();
+ // To be called in grpc_shutdown();
static void Shutdown();
- // globally registers a channelz Object. Returns its unique uuid
- template <typename Object>
- static intptr_t Register(Object* object) {
- return Default()->InternalRegister(object);
+ // Register/Unregister/Get for ChannelNode
+ static intptr_t RegisterChannelNode(ChannelNode* channel_node) {
+ RegistryEntry entry(channel_node, EntityType::kChannelNode);
+ return Default()->InternalRegisterEntry(entry);
+ }
+ static void UnregisterChannelNode(intptr_t uuid) {
+ Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode);
+ }
+ static ChannelNode* GetChannelNode(intptr_t uuid) {
+ void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode);
+ return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten);
}
- // globally unregisters the object that is associated to uuid.
- static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
+ // Register/Unregister/Get for SubchannelNode
+ static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) {
+ RegistryEntry entry(channel_node, EntityType::kSubchannelNode);
+ return Default()->InternalRegisterEntry(entry);
+ }
+ static void UnregisterSubchannelNode(intptr_t uuid) {
+ Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode);
+ }
+ static SubchannelNode* GetSubchannelNode(intptr_t uuid) {
+ void* gotten =
+ Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode);
+ return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten);
+ }
- // if object with uuid has previously been registered, returns the
- // Object associated with that uuid. Else returns nullptr.
- template <typename Object>
- static Object* Get(intptr_t uuid) {
- return Default()->InternalGet<Object>(uuid);
+ // Returns the allocated JSON string that represents the proto
+ // GetTopChannelsResponse as per channelz.proto.
+ static char* GetTopChannels(intptr_t start_channel_id) {
+ return Default()->InternalGetTopChannels(start_channel_id);
}
private:
+ enum class EntityType {
+ kChannelNode,
+ kSubchannelNode,
+ kUnset,
+ };
+
+ struct RegistryEntry {
+ RegistryEntry(void* object_in, EntityType type_in)
+ : object(object_in), type(type_in) {}
+ void* object;
+ EntityType type;
+ };
+
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -64,40 +96,25 @@ class ChannelzRegistry {
// Returned the singleton instance of ChannelzRegistry;
static ChannelzRegistry* Default();
- // globally registers a channelz Object. Returns its unique uuid
- template <typename Object>
- intptr_t InternalRegister(Object* object) {
- gpr_mu_lock(&mu_);
- entities_.push_back(static_cast<void*>(object));
- intptr_t uuid = entities_.size();
- gpr_mu_unlock(&mu_);
- return uuid;
- }
+ // globally registers an Entry. Returns its unique uuid
+ intptr_t InternalRegisterEntry(const RegistryEntry& entry);
- // globally unregisters the object that is associated to uuid.
- void InternalUnregister(intptr_t uuid);
-
- // if object with uuid has previously been registered, returns the
- // Object associated with that uuid. Else returns nullptr.
- template <typename Object>
- Object* InternalGet(intptr_t uuid) {
- gpr_mu_lock(&mu_);
- if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
- gpr_mu_unlock(&mu_);
- return nullptr;
- }
- Object* ret = static_cast<Object*>(entities_[uuid - 1]);
- gpr_mu_unlock(&mu_);
- return ret;
- }
+ // globally unregisters the object that is associated to uuid. Also does
+ // sanity check that an object doesn't try to unregister the wrong type.
+ void InternalUnregisterEntry(intptr_t uuid, EntityType type);
+
+ // if object with uuid has previously been registered as the correct type,
+ // returns the void* associated with that uuid. Else returns nullptr.
+ void* InternalGetEntry(intptr_t uuid, EntityType type);
- // private members
+ char* InternalGetTopChannels(intptr_t start_channel_id);
// protects entities_ and uuid_
gpr_mu mu_;
- InlinedVector<void*, 20> entities_;
+ InlinedVector<RegistryEntry, 20> entities_;
};
+} // namespace channelz
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNELZ_REGISTRY_H */
diff --git a/src/core/lib/gpr/string.cc b/src/core/lib/gpr/string.cc
index ef2a6900b4..0a76fc1f54 100644
--- a/src/core/lib/gpr/string.cc
+++ b/src/core/lib/gpr/string.cc
@@ -23,8 +23,10 @@
#include <ctype.h>
#include <limits.h>
#include <stddef.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <time.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -54,6 +56,32 @@ typedef struct {
char* data;
} dump_out;
+char* gpr_format_timespec(gpr_timespec tm) {
+ char time_buffer[35];
+ char ns_buffer[11]; // '.' + 9 digits of precision
+ struct tm* tm_info = localtime((const time_t*)&tm.tv_sec);
+ strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info);
+ snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec);
+ // This loop trims off trailing zeros by inserting a null character that the
+ // right point. We iterate in chunks of three because we want 0, 3, 6, or 9
+ // fractional digits.
+ for (int i = 7; i >= 1; i -= 3) {
+ if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' &&
+ ns_buffer[i + 2] == '0') {
+ ns_buffer[i] = '\0';
+ // Edge case in which all fractional digits were 0.
+ if (i == 1) {
+ ns_buffer[0] = '\0';
+ }
+ } else {
+ break;
+ }
+ }
+ char* full_time_str;
+ gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer);
+ return full_time_str;
+}
+
static dump_out dump_out_create(void) {
dump_out r = {0, 0, nullptr};
return r;
diff --git a/src/core/lib/gpr/string.h b/src/core/lib/gpr/string.h
index 2e8a4898d9..ce51fe4632 100644
--- a/src/core/lib/gpr/string.h
+++ b/src/core/lib/gpr/string.h
@@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
+#include <grpc/impl/codegen/gpr_types.h>
+
#include <stdbool.h>
#include <stddef.h>
@@ -81,6 +83,14 @@ char* gpr_strjoin_sep(const char** strs, size_t nstrs, const char* sep,
void gpr_string_split(const char* input, const char* sep, char*** strs,
size_t* nstrs);
+/* Returns an allocated string that represents tm according to RFC-3339, and,
+ more specifically, follows:
+ https://developers.google.com/protocol-buffers/docs/proto3#json
+
+ Uses RFC 3339, where generated output will always be Z-normalized and uses
+ 0, 3, 6 or 9 fractional digits. */
+char* gpr_format_timespec(gpr_timespec);
+
/* A vector of strings... for building up a final string one piece at a time */
typedef struct {
char** strs;
diff --git a/src/core/lib/gprpp/abstract.h b/src/core/lib/gprpp/abstract.h
index cc96edc49b..5b7018e07e 100644
--- a/src/core/lib/gprpp/abstract.h
+++ b/src/core/lib/gprpp/abstract.h
@@ -28,7 +28,10 @@
// gRPC currently can't depend on libstdc++, so we can't use "= 0" for
// pure virtual methods. Instead, we use this macro.
-#define GRPC_ABSTRACT \
- { GPR_ASSERT(false); }
+#define GRPC_ABSTRACT \
+ { \
+ gpr_log(GPR_ERROR, "Function marked GRPC_ABSTRACT was not implemented"); \
+ GPR_ASSERT(false); \
+ }
#endif /* GRPC_CORE_LIB_GPRPP_ABSTRACT_H */
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index 0d2586e507..76e2f0a785 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <cassert>
+#include <cstring>
#include "src/core/lib/gprpp/memory.h"
@@ -50,9 +51,33 @@ class InlinedVector {
InlinedVector() { init_data(); }
~InlinedVector() { destroy_elements(); }
- // For now, we do not support copying.
- InlinedVector(const InlinedVector&) = delete;
- InlinedVector& operator=(const InlinedVector&) = delete;
+ // copy constructor
+ InlinedVector(const InlinedVector& v) {
+ init_data();
+ copy_from(v);
+ }
+
+ InlinedVector& operator=(const InlinedVector& v) {
+ if (this != &v) {
+ clear();
+ copy_from(v);
+ }
+ return *this;
+ }
+
+ // move constructor
+ InlinedVector(InlinedVector&& v) {
+ init_data();
+ move_from(v);
+ }
+
+ InlinedVector& operator=(InlinedVector&& v) {
+ if (this != &v) {
+ clear();
+ move_from(v);
+ }
+ return *this;
+ }
T* data() {
return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<T*>(inline_);
@@ -98,6 +123,33 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); }
+ void copy_from(const InlinedVector& v) {
+ // if v is allocated, copy over the buffer.
+ if (v.dynamic_ != nullptr) {
+ reserve(v.capacity_);
+ memcpy(dynamic_, v.dynamic_, v.size_ * sizeof(T));
+ } else {
+ memcpy(inline_, v.inline_, v.size_ * sizeof(T));
+ }
+ // copy over metadata
+ size_ = v.size_;
+ capacity_ = v.capacity_;
+ }
+
+ void move_from(InlinedVector& v) {
+ // if v is allocated, then we steal its buffer, else we copy it.
+ if (v.dynamic_ != nullptr) {
+ dynamic_ = v.dynamic_;
+ } else {
+ memcpy(inline_, v.inline_, v.size_ * sizeof(T));
+ }
+ // copy over metadata
+ size_ = v.size_;
+ capacity_ = v.capacity_;
+ // null out the original
+ v.init_data();
+ }
+
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc
index 5b6b79fa91..085fea40a4 100644
--- a/src/core/lib/iomgr/lockfree_event.cc
+++ b/src/core/lib/iomgr/lockfree_event.cc
@@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
void LockfreeEvent::NotifyOn(grpc_closure* closure) {
while (true) {
- gpr_atm curr = gpr_atm_no_barrier_load(&state_);
+ /* This load needs to be an acquire load because this can be a shutdown
+ * error that we might need to reference. Adding acquire semantics makes
+ * sure that the shutdown error has been initialized properly before us
+ * referencing it. */
+ gpr_atm curr = gpr_atm_acq_load(&state_);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);
diff --git a/src/core/lib/json/json.cc b/src/core/lib/json/json.cc
index 816241bbf0..e78b73cefd 100644
--- a/src/core/lib/json/json.cc
+++ b/src/core/lib/json/json.cc
@@ -18,10 +18,12 @@
#include <grpc/support/port_platform.h>
+#include <inttypes.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/lib/json/json.h"
@@ -56,6 +58,8 @@ void grpc_json_destroy(grpc_json* json) {
grpc_json* grpc_json_link_child(grpc_json* parent, grpc_json* child,
grpc_json* sibling) {
+ // link child up to parent
+ child->parent = parent;
// first child case.
if (parent->child == nullptr) {
GPR_ASSERT(sibling == nullptr);
@@ -79,8 +83,15 @@ grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent,
grpc_json* child = grpc_json_create(type);
grpc_json_link_child(parent, child, sibling);
child->owns_value = owns_value;
- child->parent = parent;
child->value = value;
child->key = key;
return child;
}
+
+grpc_json* grpc_json_add_number_string_child(grpc_json* parent, grpc_json* it,
+ const char* name, int64_t num) {
+ char* num_str;
+ gpr_asprintf(&num_str, "%" PRId64, num);
+ return grpc_json_create_child(it, parent, name, num_str, GRPC_JSON_STRING,
+ true);
+}
diff --git a/src/core/lib/json/json.h b/src/core/lib/json/json.h
index f93b43048b..8173845c72 100644
--- a/src/core/lib/json/json.h
+++ b/src/core/lib/json/json.h
@@ -91,4 +91,9 @@ grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent,
const char* key, const char* value,
grpc_json_type type, bool owns_value);
+/* Creates a child json string object from the integer num, then links the
+ json object into the parent's json tree */
+grpc_json* grpc_json_add_number_string_child(grpc_json* parent, grpc_json* it,
+ const char* name, int64_t num);
+
#endif /* GRPC_CORE_LIB_JSON_JSON_H */
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 8f3ad6c191..7cbd61adef 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -105,6 +105,7 @@ grpc_channel* grpc_channel_create_with_builder(
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
size_t channel_tracer_max_nodes = 0; // default to off
bool channelz_enabled = false;
+ bool internal_channel = false;
// this creates the default ChannelNode. Different types of channels may
// override this to ensure a correct ChannelNode is created.
grpc_core::channelz::ChannelNodeCreationFunc channel_node_create_func =
@@ -158,13 +159,17 @@ grpc_channel* grpc_channel_create_with_builder(
channel_node_create_func =
reinterpret_cast<grpc_core::channelz::ChannelNodeCreationFunc>(
args->args[i].value.pointer.p);
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL)) {
+ internal_channel = grpc_channel_arg_get_bool(&args->args[i], false);
}
}
grpc_channel_args_destroy(args);
if (channelz_enabled) {
- channel->channelz_channel =
- channel_node_create_func(channel, channel_tracer_max_nodes);
+ bool is_top_level_channel = channel->is_client && !internal_channel;
+ channel->channelz_channel = channel_node_create_func(
+ channel, channel_tracer_max_nodes, is_top_level_channel);
channel->channelz_channel->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 16be81e9c2..0ad82fed99 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -127,7 +127,7 @@ void grpc_init(void) {
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_core::ChannelzRegistry::Init();
+ grpc_core::channelz::ChannelzRegistry::Init();
grpc_security_pre_init();
grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init();
@@ -176,7 +176,7 @@ void grpc_shutdown(void) {
grpc_mdctx_global_shutdown();
grpc_handshaker_factory_registry_shutdown();
grpc_slice_intern_shutdown();
- grpc_core::ChannelzRegistry::Shutdown();
+ grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}