From 4377cbadaf6465660058304bca0f502f92d06a1e Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 12 Oct 2018 12:25:57 -0400 Subject: Support channelz pagination --- src/core/lib/channel/channelz_registry.cc | 38 +++++++++++++++++++++-------- test/core/channel/channelz_test.cc | 40 ++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 67e56ed791..c6944afb68 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -38,6 +38,8 @@ namespace { // singleton instance of the registry. ChannelzRegistry* g_channelz_registry = nullptr; +const int kPaginationLimit = 100; + } // anonymous namespace void ChannelzRegistry::Init() { g_channelz_registry = New(); } @@ -124,6 +126,7 @@ BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) { } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { + MutexLock lock(&mu_); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; @@ -133,10 +136,18 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { // start_channel_id=0, which signifies "give me everything." Hence this // funky looking line below. size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; + bool more_to_come = false; for (size_t i = start_idx; i < entities_.size(); ++i) { if (entities_[i] != nullptr && entities_[i]->type() == grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) { + // check if we are over pagination limit to determine if we need to set + // the "end" element. If we don't go through this block, we know that + // when the loop terminates, we have <= to kPaginationLimit. + if (top_level_channels.size() == kPaginationLimit) { + more_to_come = true; + break; + } top_level_channels.push_back(entities_[i]); } } @@ -150,17 +161,17 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { grpc_json_link_child(array_parent, channel_json, json_iterator); } } - // For now we do not have any pagination rules. In the future we could - // pick a constant for max_channels_sent for a GetTopChannels request. - // Tracking: https://github.com/grpc/grpc/issues/16019. - json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, - GRPC_JSON_TRUE, false); + if (!more_to_come) { + grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE, + false); + } char* json_str = grpc_json_dump_to_string(top_level_json, 0); grpc_json_destroy(top_level_json); return json_str; } char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { + MutexLock lock(&mu_); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; @@ -169,10 +180,18 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { // reserved). However, we want to support requests coming in with // start_server_id=0, which signifies "give me everything." size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1; + bool more_to_come = false; for (size_t i = start_idx; i < entities_.size(); ++i) { if (entities_[i] != nullptr && entities_[i]->type() == grpc_core::channelz::BaseNode::EntityType::kServer) { + // check if we are over pagination limit to determine if we need to set + // the "end" element. If we don't go through this block, we know that + // when the loop terminates, we have <= to kPaginationLimit. + if (servers.size() == kPaginationLimit) { + more_to_come = true; + break; + } servers.push_back(entities_[i]); } } @@ -186,11 +205,10 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { grpc_json_link_child(array_parent, server_json, json_iterator); } } - // For now we do not have any pagination rules. In the future we could - // pick a constant for max_channels_sent for a GetServers request. - // Tracking: https://github.com/grpc/grpc/issues/16019. - json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, - GRPC_JSON_TRUE, false); + if (!more_to_come) { + grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE, + false); + } char* json_str = grpc_json_dump_to_string(top_level_json, 0); grpc_json_destroy(top_level_json); return json_str; diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index aed1fba47c..c9abc4465c 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -226,7 +226,19 @@ void ChannelzSleep(int64_t sleep_us) { } // anonymous namespace -class ChannelzChannelTest : public ::testing::TestWithParam {}; +class ChannelzChannelTest : public ::testing::TestWithParam { + protected: + // ensure we always have a fresh registry for tests. + void SetUp() override { + ChannelzRegistry::Shutdown(); + ChannelzRegistry::Init(); + } + + void TearDown() override { + ChannelzRegistry::Shutdown(); + ChannelzRegistry::Init(); + } +}; TEST_P(ChannelzChannelTest, BasicChannel) { grpc_core::ExecCtx exec_ctx; @@ -305,6 +317,32 @@ TEST(ChannelzGetTopChannelsTest, ManyChannelsTest) { ValidateGetTopChannels(10); } +TEST(ChannelzGetTopChannelsTest, GetTopChannelsPagination) { + grpc_core::ExecCtx exec_ctx; + // this is over the pagination limit. + ChannelFixture channels[150]; + (void)channels; // suppress unused variable error + char* json_str = ChannelzRegistry::GetTopChannels(0); + grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(json_str); + grpc_json* parsed_json = grpc_json_parse_string(json_str); + // 100 is the pagination limit. + ValidateJsonArraySize(parsed_json, "channel", 100); + grpc_json* end = GetJsonChild(parsed_json, "end"); + EXPECT_EQ(end, nullptr); + grpc_json_destroy(parsed_json); + gpr_free(json_str); + // Now we get the rest + json_str = ChannelzRegistry::GetTopChannels(101); + grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(json_str); + parsed_json = grpc_json_parse_string(json_str); + ValidateJsonArraySize(parsed_json, "channel", 50); + end = GetJsonChild(parsed_json, "end"); + ASSERT_NE(end, nullptr); + EXPECT_EQ(end->type, GRPC_JSON_TRUE); + grpc_json_destroy(parsed_json); + gpr_free(json_str); +} + TEST(ChannelzGetTopChannelsTest, InternalChannelTest) { grpc_core::ExecCtx exec_ctx; ChannelFixture channels[10]; -- cgit v1.2.3