aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc4
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc88
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.h47
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc69
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h12
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc15
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h13
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.cc17
-rw-r--r--src/core/lib/channel/channel_stack.h2
-rw-r--r--src/core/lib/channel/channel_trace.cc91
-rw-r--r--src/core/lib/channel/channel_trace.h28
-rw-r--r--src/core/lib/channel/channelz.cc126
-rw-r--r--src/core/lib/channel/channelz.h133
-rw-r--r--src/core/lib/channel/channelz_registry.cc60
-rw-r--r--src/core/lib/channel/channelz_registry.h50
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/surface/call.cc10
-rw-r--r--src/core/lib/surface/channel.cc5
21 files changed, 510 insertions, 282 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 024c9d737e..683cb0e01d 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -892,6 +892,7 @@ typedef struct client_channel_call_data {
grpc_millis deadline;
gpr_arena* arena;
grpc_call_stack* owning_call;
+ grpc_call* call;
grpc_call_combiner* call_combiner;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
@@ -2561,7 +2562,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
calld->arena, // arena
calld->pick.subchannel_call_context, // context
calld->call_combiner, // call_combiner
- parent_data_size // parent_data_size
+ parent_data_size, // parent_data_size
+ calld->call // call
};
grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
call_args, &calld->subchannel_call);
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 86c765df52..f797ef1b09 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -20,10 +20,13 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include <grpc/support/string_util.h>
+
namespace grpc_core {
namespace channelz {
namespace {
@@ -95,6 +98,40 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
}
}
+grpc_json* ClientChannelNode::RenderJson() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "channelId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ PopulateConnectivityState(json);
+ // populate the target.
+ GPR_ASSERT(target_view() != nullptr);
+ grpc_json_create_child(nullptr, json, "target", target_view(),
+ GRPC_JSON_STRING, false);
+ // as CallCountingAndTracingNode to populate trace and call count data.
+ counter_and_tracer()->PopulateTrace(json);
+ counter_and_tracer()->PopulateCallData(json);
+ // reset to the top level
+ json = top_level_json;
+ PopulateChildRefs(json);
+ return top_level_json;
+}
+
grpc_arg ClientChannelNode::CreateChannelArg() {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC),
@@ -109,5 +146,56 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
is_top_level_channel);
}
+SubchannelNode::SubchannelNode(grpc_subchannel* subchannel,
+ size_t channel_tracer_max_nodes)
+ : BaseNode(EntityType::kSubchannel),
+ subchannel_(subchannel),
+ target_(
+ UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))),
+ counter_and_tracer_(channel_tracer_max_nodes) {}
+
+SubchannelNode::~SubchannelNode() {}
+
+void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
+ grpc_connectivity_state state;
+ if (subchannel_ == nullptr) {
+ state = GRPC_CHANNEL_SHUTDOWN;
+ } else {
+ state = grpc_subchannel_check_connectivity(subchannel_, nullptr);
+ }
+ json = grpc_json_create_child(nullptr, json, "state", nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_create_child(nullptr, json, "state",
+ grpc_connectivity_state_name(state), GRPC_JSON_STRING,
+ false);
+}
+
+grpc_json* SubchannelNode::RenderJson() {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "subchannelId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ PopulateConnectivityState(json);
+ GPR_ASSERT(target_.get() != nullptr);
+ grpc_json_create_child(nullptr, json, "target", target_.get(),
+ GRPC_JSON_STRING, false);
+ counter_and_tracer_.PopulateTrace(json);
+ counter_and_tracer_.PopulateCallData(json);
+ return top_level_json;
+}
+
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h
index 6f27b5c8b7..f5344c049e 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.h
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.h
@@ -26,6 +26,8 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
+typedef struct grpc_subchannel grpc_subchannel;
+
namespace grpc_core {
// TODO(ncteisen), this only contains the uuids of the children for now,
@@ -43,26 +45,47 @@ class ClientChannelNode : public ChannelNode {
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
- // Override this functionality since client_channels have a notion of
- // channel connectivity.
- void PopulateConnectivityState(grpc_json* json) override;
+ ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+ virtual ~ClientChannelNode() {}
- // Override this functionality since client_channels have subchannels
- void PopulateChildRefs(grpc_json* json) override;
+ grpc_json* RenderJson() override;
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
- protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
- virtual ~ClientChannelNode() {}
-
private:
grpc_channel_element* client_channel_;
+
+ // helpers
+ void PopulateConnectivityState(grpc_json* json);
+ void PopulateChildRefs(grpc_json* json);
+};
+
+// Handles channelz bookkeeping for sockets
+class SubchannelNode : public BaseNode {
+ public:
+ SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes);
+ ~SubchannelNode() override;
+
+ void MarkSubchannelDestroyed() {
+ GPR_ASSERT(subchannel_ != nullptr);
+ subchannel_ = nullptr;
+ }
+
+ grpc_json* RenderJson() override;
+
+ CallCountingAndTracingNode* counter_and_tracer() {
+ return &counter_and_tracer_;
+ }
+
+ private:
+ grpc_subchannel* subchannel_;
+ UniquePtr<char> target_;
+ CallCountingAndTracingNode counter_and_tracer_;
+
+ void PopulateConnectivityState(grpc_json* json);
};
} // namespace channelz
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 959c7441a3..dc475d3d68 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -1294,7 +1294,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
- child_channels->push_back(channel_node->channel_uuid());
+ child_channels->push_back(channel_node->uuid());
}
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 023281db97..d217dc0e63 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -451,6 +451,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
if (grpc_lb_pick_first_trace.enabled()) {
@@ -480,14 +481,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"update"),
"selected_not_ready+switch_to_update");
} else {
- // TODO(juanlishen): we re-resolve when the selected subchannel goes to
- // TRANSIENT_FAILURE because we used to shut down in this case before
- // re-resolution is introduced. But we need to investigate whether we
- // really want to take any action instead of waiting for the selected
- // subchannel reconnecting.
- GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // If the selected channel goes bad, request a re-resolution.
+ // If the selected subchannel goes bad, request a re-resolution. We also
+ // set the channel state to IDLE and reset started_picking_. The reason
+ // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
+ // reception we don't want to connect to the re-resolved backends until
+ // we leave the IDLE state.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
@@ -568,9 +567,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
+ p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
+ GRPC_ERROR_REF(error), "exhausted_subchannels");
}
sd->StartConnectivityWatchLocked();
break;
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 018ac3bb86..199b9a3c13 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -196,7 +196,7 @@ class SubchannelList
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
if (subchannel_node != nullptr) {
- refs_list->push_back(subchannel_node->subchannel_uuid());
+ refs_list->push_back(subchannel_node->uuid());
}
}
}
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 93df2aff70..d7b64a900f 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -46,6 +46,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -181,7 +182,13 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
- c->channelz_subchannel.reset();
+ if (c->channelz_subchannel != nullptr) {
+ c->channelz_subchannel->counter_and_tracer()->trace()->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Subchannel destroyed"));
+ c->channelz_subchannel->MarkSubchannelDestroyed();
+ c->channelz_subchannel.reset();
+ }
gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@@ -381,9 +388,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_arg* arg =
grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
+ arg = grpc_channel_args_find(c->args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ const grpc_integer_options options = {0, 0, INT_MAX};
+ size_t channel_tracer_max_nodes =
+ (size_t)grpc_channel_arg_get_integer(arg, options);
if (channelz_enabled) {
c->channelz_subchannel =
- grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>();
+ grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
+ c, channel_tracer_max_nodes);
+ c->channelz_subchannel->counter_and_tracer()->trace()->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Subchannel created"));
}
return grpc_subchannel_index_register(key, c);
@@ -402,8 +418,6 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@@ -459,27 +473,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Don't try to connect if we're already disconnected */
return;
}
-
if (c->connecting) {
/* Already connecting: don't restart */
return;
}
-
if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
-
if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
-
c->connecting = true;
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
-
if (!c->backoff_begun) {
c->backoff_begun = true;
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@@ -494,6 +505,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
+ // During backoff, we prefer the connectivity state of CONNECTING instead of
+ // TRANSIENT_FAILURE in order to prevent triggering re-resolution
+ // continuously in pick_first.
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "backoff");
}
}
@@ -625,8 +641,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
}
/* publish */
- c->connected_subchannel.reset(
- grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
+ stk, c->channelz_subchannel.get()));
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
@@ -757,6 +773,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
}
}
+const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) {
+ const grpc_arg* addr_arg =
+ grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
+ const char* addr_str = grpc_channel_arg_get_string(addr_arg);
+ GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
+ return addr_str;
+}
+
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
const grpc_arg* addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
@@ -773,9 +797,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
namespace grpc_core {
-ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ConnectedSubchannel::ConnectedSubchannel(
+ grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel)
: RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
- channel_stack_(channel_stack) {}
+ channel_stack_(channel_stack),
+ channelz_subchannel_(channelz_subchannel) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
@@ -822,14 +849,14 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args.context, /* context */
- args.path, /* path */
- args.start_time, /* start_time */
- args.deadline, /* deadline */
- args.arena, /* arena */
- args.call_combiner /* call_combiner */
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner, /* call_combiner */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9e53f7d542..d62348488e 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -83,9 +83,11 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_call_context_element* context;
grpc_call_combiner* call_combiner;
size_t parent_data_size;
+ grpc_call* call;
};
- explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel);
~ConnectedSubchannel();
grpc_channel_stack* channel_stack() { return channel_stack_; }
@@ -94,9 +96,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+ channelz::SubchannelNode* channelz_subchannel() {
+ return channelz_subchannel_;
+ }
private:
grpc_channel_stack* channel_stack_;
+ // backpointer to the channelz node in this connected subchannel's
+ // owning subchannel.
+ channelz::SubchannelNode* channelz_subchannel_;
};
} // namespace grpc_core
@@ -177,6 +185,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
grpc_resolved_address* addr);
+const char* grpc_subchannel_get_target(grpc_subchannel* subchannel);
+
/// Returns the URI string for the address to connect to.
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bc6fa0d0eb..9ad271753c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -813,7 +813,11 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
+ grpc_chttp2_stream* s;
+ while (grpc_chttp2_list_pop_waiting_for_write_stream(t, &s)) {
+ GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:write_closure_sched");
+ }
if (t->close_transport_on_writes_finished != nullptr) {
grpc_error* err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = nullptr;
@@ -1208,7 +1212,10 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
} else {
- grpc_closure_list_append(&t->run_after_write, closure,
+ if (grpc_chttp2_list_add_waiting_for_write_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_REF(s, "chttp2:pending_write_closure");
+ }
+ grpc_closure_list_append(&s->run_after_write, closure,
closure->error_data.error);
}
}
@@ -2009,6 +2016,10 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error* due_to_error) {
+ GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
+ if (grpc_chttp2_list_remove_waiting_for_write_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:pending_write_closure");
+ }
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error)) {
close_from_api(t, s, due_to_error);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ca6e715978..4f1a08d98b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -54,6 +54,8 @@ typedef enum {
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
+ /** streams with closures waiting to be run on a write **/
+ GRPC_CHTTP2_LIST_WAITING_FOR_WRITE,
STREAM_LIST_COUNT /* must be last */
} grpc_chttp2_stream_list_id;
@@ -431,9 +433,6 @@ struct grpc_chttp2_transport {
*/
grpc_error* close_transport_on_writes_finished;
- /* a list of closures to run after writes are finished */
- grpc_closure_list run_after_write;
-
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@@ -584,6 +583,7 @@ struct grpc_chttp2_stream {
grpc_slice_buffer flow_controlled_buffer;
+ grpc_closure_list run_after_write;
grpc_chttp2_write_cb* on_flow_controlled_cbs;
grpc_chttp2_write_cb* on_write_finished_cbs;
grpc_chttp2_write_cb* finish_after_write;
@@ -686,6 +686,13 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
+bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s);
+bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream** s);
+bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s);
+
/********* Flow Control ***************/
// Takes in a flow control action and performs all the needed operations.
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc
index 6626170a7e..50bfe36a86 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.cc
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc
@@ -35,6 +35,8 @@ static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
return "stalled_by_stream";
case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
return "waiting_for_concurrency";
+ case GRPC_CHTTP2_LIST_WAITING_FOR_WRITE:
+ return "waiting_for_write";
case STREAM_LIST_COUNT:
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -214,3 +216,18 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}
+
+bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ return stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}
+
+bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream** s) {
+ return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}
+
+bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 7581f937b6..35c3fb01ea 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -82,7 +82,7 @@ typedef struct {
typedef struct {
grpc_call_stats stats;
grpc_status_code final_status;
- const char** error_string;
+ const char* error_string;
} grpc_call_final_info;
/* Channel filters specify:
diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc
index b3443310ac..7d8bb391f8 100644
--- a/src/core/lib/channel/channel_trace.cc
+++ b/src/core/lib/channel/channel_trace.cc
@@ -43,14 +43,23 @@ namespace channelz {
ChannelTrace::TraceEvent::TraceEvent(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type)
+ RefCountedPtr<ChannelNode> referenced_channel)
: severity_(severity),
data_(data),
timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
GPR_CLOCK_REALTIME)),
next_(nullptr),
- referenced_channel_(std::move(referenced_channel)),
- referenced_type_(type) {}
+ referenced_channel_(std::move(referenced_channel)) {}
+
+// ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data,
+// RefCountedPtr<SubchannelNode>
+// referenced_subchannel)
+// : severity_(severity),
+// data_(data),
+// timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
+// GPR_CLOCK_REALTIME)),
+// next_(nullptr),
+// referenced_subchannel_(std::move(referenced_subchannel)) {}
ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data)
: severity_(severity),
@@ -115,19 +124,18 @@ void ChannelTrace::AddTraceEventReferencingChannel(
RefCountedPtr<ChannelNode> referenced_channel) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
- AddTraceEventHelper(New<TraceEvent>(
- severity, data, std::move(referenced_channel), ReferencedType::Channel));
+ AddTraceEventHelper(
+ New<TraceEvent>(severity, data, std::move(referenced_channel)));
}
-void ChannelTrace::AddTraceEventReferencingSubchannel(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_subchannel) {
- if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
- // create and fill up the new event
- AddTraceEventHelper(New<TraceEvent>(severity, data,
- std::move(referenced_subchannel),
- ReferencedType::Subchannel));
-}
+// void ChannelTrace::AddTraceEventReferencingSubchannel(
+// Severity severity, grpc_slice data,
+// RefCountedPtr<SubchannelNode> referenced_subchannel) {
+// if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
+// // create and fill up the new event
+// AddTraceEventHelper(
+// New<TraceEvent>(severity, data, std::move(referenced_subchannel)));
+// }
namespace {
@@ -159,43 +167,50 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
GRPC_JSON_STRING, true);
if (referenced_channel_ != nullptr) {
char* uuid_str;
- gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid());
+ gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->uuid());
grpc_json* child_ref = grpc_json_create_child(
- json_iterator, json,
- (referenced_type_ == ReferencedType::Channel) ? "channelRef"
- : "subchannelRef",
- nullptr, GRPC_JSON_OBJECT, false);
- json_iterator = grpc_json_create_child(
- nullptr, child_ref,
- (referenced_type_ == ReferencedType::Channel) ? "channelId"
- : "subchannelId",
- uuid_str, GRPC_JSON_STRING, true);
+ json_iterator, json, "channelRef", nullptr, GRPC_JSON_OBJECT, false);
+ json_iterator = grpc_json_create_child(nullptr, child_ref, "channelId",
+ uuid_str, GRPC_JSON_STRING, true);
json_iterator = child_ref;
}
+ // else {
+ // char* uuid_str;
+ // gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_subchannel_->uuid());
+ // grpc_json* child_ref = grpc_json_create_child(
+ // json_iterator, json, "subchannelRef",
+ // nullptr, GRPC_JSON_OBJECT, false);
+ // json_iterator = grpc_json_create_child(
+ // nullptr, child_ref, "subchannelId",
+ // uuid_str, GRPC_JSON_STRING, true);
+ // json_iterator = child_ref;
+ // }
}
grpc_json* ChannelTrace::RenderJson() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
- char* num_events_logged_str;
- gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_);
grpc_json* json_iterator = nullptr;
- json_iterator =
- grpc_json_create_child(json_iterator, json, "numEventsLogged",
- num_events_logged_str, GRPC_JSON_STRING, true);
+ if (num_events_logged_ > 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "numEventsLogged", num_events_logged_);
+ }
json_iterator = grpc_json_create_child(
json_iterator, json, "creationTimestamp",
gpr_format_timespec(time_created_), GRPC_JSON_STRING, true);
- grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
- nullptr, GRPC_JSON_ARRAY, false);
- json_iterator = nullptr;
- TraceEvent* it = head_trace_;
- while (it != nullptr) {
- json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
- nullptr, GRPC_JSON_OBJECT, false);
- it->RenderTraceEvent(json_iterator);
- it = it->next();
+ // only add in the event list if it is non-empty.
+ if (num_events_logged_ > 0) {
+ grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
+ nullptr, GRPC_JSON_ARRAY, false);
+ json_iterator = nullptr;
+ TraceEvent* it = head_trace_;
+ while (it != nullptr) {
+ json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
+ nullptr, GRPC_JSON_OBJECT, false);
+ it->RenderTraceEvent(json_iterator);
+ it = it->next();
+ }
}
return json;
}
diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h
index 596af7402f..543eabf13a 100644
--- a/src/core/lib/channel/channel_trace.h
+++ b/src/core/lib/channel/channel_trace.h
@@ -31,6 +31,7 @@ namespace grpc_core {
namespace channelz {
class ChannelNode;
+class SubchannelNode;
// Object used to hold live data for a channel. This data is exposed via the
// channelz service:
@@ -55,9 +56,9 @@ class ChannelTrace {
void AddTraceEvent(Severity severity, grpc_slice data);
// Adds a new trace event to the tracing object. This trace event refers to a
- // an event on a child of the channel. For example, if this channel has
- // created a new subchannel, then it would record that with a TraceEvent
- // referencing the new subchannel.
+ // an event that concerns a different channelz entity. For example, if this
+ // channel has created a new subchannel, then it would record that with
+ // a TraceEvent referencing the new subchannel.
//
// TODO(ncteisen): as this call is used more and more throughout the gRPC
// stack, determine if it makes more sense to accept a char* instead of a
@@ -65,25 +66,26 @@ class ChannelTrace {
void AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_channel);
- void AddTraceEventReferencingSubchannel(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_subchannel);
+ // void AddTraceEventReferencingSubchannel(
+ // Severity severity, grpc_slice data,
+ // RefCountedPtr<SubchannelNode> referenced_subchannel);
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
grpc_json* RenderJson() const;
private:
- // Types of objects that can be references by trace events.
- enum class ReferencedType { Channel, Subchannel };
// Private class to encapsulate all the data and bookkeeping needed for a
// a trace event.
class TraceEvent {
public:
- // Constructor for a TraceEvent that references a different channel.
+ // Constructor for a TraceEvent that references a channel.
+ TraceEvent(Severity severity, grpc_slice data,
+ RefCountedPtr<ChannelNode> referenced_channel);
+
+ // Constructor for a TraceEvent that references a subchannel.
TraceEvent(Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel,
- ReferencedType type);
+ RefCountedPtr<SubchannelNode> referenced_subchannel);
// Constructor for a TraceEvent that does not reverence a different
// channel.
@@ -106,9 +108,7 @@ class ChannelTrace {
TraceEvent* next_;
// the tracer object for the (sub)channel that this trace event refers to.
RefCountedPtr<ChannelNode> referenced_channel_;
- // the type that the referenced tracer points to. Unused if this trace
- // does not point to any channel or subchannel
- ReferencedType referenced_type_;
+ // RefCountedPtr<SubchannelNode> referenced_subchannel_;
}; // TraceEvent
// Internal helper to add and link in a trace event
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 9d6002ed8a..ee1717ce9f 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -41,69 +41,46 @@
namespace grpc_core {
namespace channelz {
-ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel)
- : channel_(channel),
- target_(nullptr),
- channel_uuid_(-1),
- is_top_level_channel_(is_top_level_channel) {
+BaseNode::BaseNode(EntityType type)
+ : type_(type), uuid_(ChannelzRegistry::Register(this)) {}
+
+BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
+
+char* BaseNode::RenderJsonString() {
+ grpc_json* json = RenderJson();
+ char* json_str = grpc_json_dump_to_string(json, 0);
+ grpc_json_destroy(json);
+ return json_str;
+}
+
+CallCountingAndTracingNode::CallCountingAndTracingNode(
+ size_t channel_tracer_max_nodes) {
trace_.Init(channel_tracer_max_nodes);
- target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
- channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-ChannelNode::~ChannelNode() {
- trace_.Destroy();
- ChannelzRegistry::UnregisterChannelNode(channel_uuid_);
-}
+CallCountingAndTracingNode::~CallCountingAndTracingNode() { trace_.Destroy(); }
-void ChannelNode::RecordCallStarted() {
+void CallCountingAndTracingNode::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-void ChannelNode::PopulateConnectivityState(grpc_json* json) {}
-
-void ChannelNode::PopulateChildRefs(grpc_json* json) {}
-
-grpc_json* ChannelNode::RenderJson() {
- // We need to track these three json objects to build our object
- grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
- grpc_json* json = top_level_json;
- grpc_json* json_iterator = nullptr;
- // create and fill the ref child
- json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
- GRPC_JSON_OBJECT, false);
- json = json_iterator;
- json_iterator = nullptr;
- json_iterator = grpc_json_add_number_string_child(json, json_iterator,
- "channelId", channel_uuid_);
- // reset json iterators to top level object
- json = top_level_json;
- json_iterator = nullptr;
- // create and fill the data child.
- grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
- GRPC_JSON_OBJECT, false);
- json = data;
- json_iterator = nullptr;
- PopulateConnectivityState(json);
- GPR_ASSERT(target_.get() != nullptr);
- json_iterator = grpc_json_create_child(
- json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
+void CallCountingAndTracingNode::PopulateTrace(grpc_json* json) {
// fill in the channel trace if applicable
- grpc_json* trace = trace_->RenderJson();
- if (trace != nullptr) {
+ grpc_json* trace_json = trace_->RenderJson();
+ if (trace_json != nullptr) {
// we manually link up and fill the child since it was created for us in
// ChannelTrace::RenderJson
- trace->key = "trace"; // this object is named trace in channelz.proto
- json_iterator = grpc_json_link_child(json, trace, json_iterator);
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
}
- // reset the parent to be the data object.
- json = data;
- json_iterator = nullptr;
+}
+
+void CallCountingAndTracingNode::PopulateCallData(grpc_json* json) {
+ grpc_json* json_iterator = nullptr;
if (calls_started_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsStarted", calls_started_);
@@ -121,19 +98,48 @@ grpc_json* ChannelNode::RenderJson() {
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+}
+
+ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel
+ : EntityType::kInternalChannel),
+ channel_(channel),
+ target_(UniquePtr<char>(grpc_channel_get_target(channel_))),
+ counter_and_tracer_(channel_tracer_max_nodes) {}
+
+ChannelNode::~ChannelNode() {}
+
+grpc_json* ChannelNode::RenderJson() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "channelId", uuid());
+ // reset json iterators to top level object
json = top_level_json;
json_iterator = nullptr;
- PopulateChildRefs(json);
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ // populate the target.
+ GPR_ASSERT(target_.get() != nullptr);
+ grpc_json_create_child(nullptr, json, "target", target_.get(),
+ GRPC_JSON_STRING, false);
+ // as CallCountingAndTracingNode to populate trace and call count data.
+ counter_and_tracer_.PopulateTrace(json);
+ counter_and_tracer_.PopulateCallData(json);
return top_level_json;
}
-char* ChannelNode::RenderJsonString() {
- grpc_json* json = RenderJson();
- char* json_str = grpc_json_dump_to_string(json, 0);
- grpc_json_destroy(json);
- return json_str;
-}
-
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
@@ -141,13 +147,5 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
channel, channel_tracer_max_nodes, is_top_level_channel);
}
-SubchannelNode::SubchannelNode() {
- subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this);
-}
-
-SubchannelNode::~SubchannelNode() {
- ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_);
-}
-
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 07eb73d626..7bc4567ad2 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -43,14 +43,53 @@ namespace grpc_core {
namespace channelz {
namespace testing {
-class ChannelNodePeer;
+class CallCountingAndTracingNodePeer;
}
-class ChannelNode : public RefCounted<ChannelNode> {
+// base class for all channelz entities
+class BaseNode : public RefCounted<BaseNode> {
public:
- static RefCountedPtr<ChannelNode> MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
+ // There are only four high level channelz entities. However, to support
+ // GetTopChannelsRequest, we split the Channel entity into two different
+ // types. All children of BaseNode must be one of these types.
+ enum class EntityType {
+ kTopLevelChannel,
+ kInternalChannel,
+ kSubchannel,
+ kServer,
+ kSocket,
+ };
+
+ BaseNode(EntityType type);
+ virtual ~BaseNode();
+
+ // All children must implement this function.
+ virtual grpc_json* RenderJson() GRPC_ABSTRACT;
+
+ // Renders the json and returns allocated string that must be freed by the
+ // caller.
+ char* RenderJsonString();
+
+ EntityType type() const { return type_; }
+ intptr_t uuid() const { return uuid_; }
+
+ private:
+ friend class ChannelTrace;
+ EntityType type_;
+ const intptr_t uuid_;
+};
+
+// This class is the parent for the channelz entities that deal with Channels
+// Subchannels, and Servers, since those have similar proto definitions.
+// This class has the ability to:
+// - track calls_{started,succeeded,failed}
+// - track last_call_started_timestamp
+// - hold the channel trace.
+// - perform common rendering.
+class CallCountingAndTracingNode {
+ public:
+ CallCountingAndTracingNode(size_t channel_tracer_max_nodes);
+ ~CallCountingAndTracingNode();
void RecordCallStarted();
void RecordCallFailed() {
@@ -59,18 +98,37 @@ class ChannelNode : public RefCounted<ChannelNode> {
void RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
+ ChannelTrace* trace() { return trace_.get(); }
- grpc_json* RenderJson();
- char* RenderJsonString();
+ // Common rendering of the channel trace.
+ void PopulateTrace(grpc_json* json);
- // helper for getting and populating connectivity state. It is virtual
- // because it allows the client_channel specific code to live in ext/
- // instead of lib/
- virtual void PopulateConnectivityState(grpc_json* json);
+ // Common rendering of the call count data and last_call_started_timestamp.
+ void PopulateCallData(grpc_json* json);
- virtual void PopulateChildRefs(grpc_json* json);
+ private:
+ // testing peer friend.
+ friend class testing::CallCountingAndTracingNodePeer;
- ChannelTrace* trace() { return trace_.get(); }
+ gpr_atm calls_started_ = 0;
+ gpr_atm calls_succeeded_ = 0;
+ gpr_atm calls_failed_ = 0;
+ gpr_atm last_call_started_millis_ = 0;
+ ManualConstructor<ChannelTrace> trace_;
+};
+
+// Handles channelz bookkeeping for channels
+class ChannelNode : public BaseNode {
+ public:
+ static RefCountedPtr<ChannelNode> MakeChannelNode(
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+
+ ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+ ~ChannelNode() override;
+
+ grpc_json* RenderJson() override;
void MarkChannelDestroyed() {
GPR_ASSERT(channel_ != nullptr);
@@ -79,47 +137,34 @@ class ChannelNode : public RefCounted<ChannelNode> {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
- intptr_t channel_uuid() { return channel_uuid_; }
- bool is_top_level_channel() { return is_top_level_channel_; }
+ CallCountingAndTracingNode* counter_and_tracer() {
+ return &counter_and_tracer_;
+ }
protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
- virtual ~ChannelNode();
+ // provides view of target for child.
+ char* target_view() { return target_.get(); }
private:
- // testing peer friend.
- friend class testing::ChannelNodePeer;
-
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
- gpr_atm calls_started_ = 0;
- gpr_atm calls_succeeded_ = 0;
- gpr_atm calls_failed_ = 0;
- gpr_atm last_call_started_millis_ = 0;
- intptr_t channel_uuid_;
- bool is_top_level_channel_ = true;
- ManualConstructor<ChannelTrace> trace_;
+ CallCountingAndTracingNode counter_and_tracer_;
};
-// Placeholds channelz class for subchannels. All this can do now is track its
-// uuid (this information is needed by the parent channelz class).
-// TODO(ncteisen): build this out to support the GetSubchannel channelz request.
-class SubchannelNode : public RefCounted<SubchannelNode> {
+// Handles channelz bookkeeping for servers
+// TODO(ncteisen): implement in subsequent PR.
+class ServerNode : public BaseNode {
public:
- SubchannelNode();
- virtual ~SubchannelNode();
-
- intptr_t subchannel_uuid() { return subchannel_uuid_; }
-
- protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+ ServerNode(size_t channel_tracer_max_nodes) : BaseNode(EntityType::kServer) {}
+ ~ServerNode() override {}
+};
- private:
- intptr_t subchannel_uuid_;
+// Handles channelz bookkeeping for sockets
+// TODO(ncteisen): implement in subsequent PR.
+class SocketNode : public BaseNode {
+ public:
+ SocketNode() : BaseNode(EntityType::kSocket) {}
+ ~SocketNode() override {}
};
// Creation functions
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 38496b3d78..d2c403cc1b 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -52,54 +52,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
-intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) {
+intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
mu_guard guard(&mu_);
- entities_.push_back(entry);
+ entities_.push_back(node);
intptr_t uuid = entities_.size();
return uuid;
}
-void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) {
+void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
GPR_ASSERT(uuid >= 1);
mu_guard guard(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
- GPR_ASSERT(entities_[uuid - 1].type == type);
- entities_[uuid - 1].object = nullptr;
- entities_[uuid - 1].type = EntityType::kUnset;
+ entities_[uuid - 1] = nullptr;
}
-void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) {
+BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
mu_guard guard(&mu_);
if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
return nullptr;
}
- if (entities_[uuid - 1].type == type) {
- return entities_[uuid - 1].object;
- } else {
- return nullptr;
- }
+ return entities_[uuid - 1];
}
char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
- InlinedVector<ChannelNode*, 10> top_level_channels;
+ InlinedVector<BaseNode*, 10> top_level_channels;
// uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
// reserved). However, we want to support requests coming in with
// start_channel_id=0, which signifies "give me everything." Hence this
// funky looking line below.
size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1;
for (size_t i = start_idx; i < entities_.size(); ++i) {
- if (entities_[i].type == EntityType::kChannelNode) {
- ChannelNode* channel_node =
- static_cast<ChannelNode*>(entities_[i].object);
- if (channel_node->is_top_level_channel()) {
- top_level_channels.push_back(channel_node);
- }
+ if (entities_[i] != nullptr &&
+ entities_[i]->type() ==
+ grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) {
+ top_level_channels.push_back(entities_[i]);
}
}
- if (top_level_channels.size() > 0) {
+ if (!top_level_channels.empty()) {
// create list of channels
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false);
@@ -128,9 +120,13 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
}
char* grpc_channelz_get_channel(intptr_t channel_id) {
- grpc_core::channelz::ChannelNode* channel_node =
- grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
- if (channel_node == nullptr) {
+ grpc_core::channelz::BaseNode* channel_node =
+ grpc_core::channelz::ChannelzRegistry::Get(channel_id);
+ if (channel_node == nullptr ||
+ (channel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
+ channel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) {
return nullptr;
}
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
@@ -142,3 +138,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_json_destroy(top_level_json);
return json_str;
}
+
+char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
+ grpc_core::channelz::BaseNode* subchannel_node =
+ grpc_core::channelz::ChannelzRegistry::Get(subchannel_id);
+ if (subchannel_node == nullptr ||
+ subchannel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kSubchannel) {
+ return nullptr;
+ }
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* subchannel_json = subchannel_node->RenderJson();
+ subchannel_json->key = "subchannel";
+ grpc_json_link_child(json, subchannel_json, nullptr);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index 5d7c936726..142c039220 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -40,32 +40,11 @@ class ChannelzRegistry {
// To be called in grpc_shutdown();
static void Shutdown();
- // Register/Unregister/Get for ChannelNode
- static intptr_t RegisterChannelNode(ChannelNode* channel_node) {
- RegistryEntry entry(channel_node, EntityType::kChannelNode);
- return Default()->InternalRegisterEntry(entry);
- }
- static void UnregisterChannelNode(intptr_t uuid) {
- Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode);
- }
- static ChannelNode* GetChannelNode(intptr_t uuid) {
- void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode);
- return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten);
- }
-
- // Register/Unregister/Get for SubchannelNode
- static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) {
- RegistryEntry entry(channel_node, EntityType::kSubchannelNode);
- return Default()->InternalRegisterEntry(entry);
- }
- static void UnregisterSubchannelNode(intptr_t uuid) {
- Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode);
- }
- static SubchannelNode* GetSubchannelNode(intptr_t uuid) {
- void* gotten =
- Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode);
- return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten);
+ static intptr_t Register(BaseNode* node) {
+ return Default()->InternalRegister(node);
}
+ static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
+ static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); }
// Returns the allocated JSON string that represents the proto
// GetTopChannelsResponse as per channelz.proto.
@@ -74,19 +53,6 @@ class ChannelzRegistry {
}
private:
- enum class EntityType {
- kChannelNode,
- kSubchannelNode,
- kUnset,
- };
-
- struct RegistryEntry {
- RegistryEntry(void* object_in, EntityType type_in)
- : object(object_in), type(type_in) {}
- void* object;
- EntityType type;
- };
-
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -97,21 +63,21 @@ class ChannelzRegistry {
static ChannelzRegistry* Default();
// globally registers an Entry. Returns its unique uuid
- intptr_t InternalRegisterEntry(const RegistryEntry& entry);
+ intptr_t InternalRegister(BaseNode* node);
// globally unregisters the object that is associated to uuid. Also does
// sanity check that an object doesn't try to unregister the wrong type.
- void InternalUnregisterEntry(intptr_t uuid, EntityType type);
+ void InternalUnregister(intptr_t uuid);
// if object with uuid has previously been registered as the correct type,
// returns the void* associated with that uuid. Else returns nullptr.
- void* InternalGetEntry(intptr_t uuid, EntityType type);
+ BaseNode* InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
// protects entities_ and uuid_
gpr_mu mu_;
- InlinedVector<RegistryEntry, 20> entities_;
+ InlinedVector<BaseNode*, 20> entities_;
};
} // namespace channelz
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7b368410cf..e1f3e43af7 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -135,7 +135,7 @@ struct pollable {
// underlying epoll set (i.e whenever fd_orphan() is called).
//
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
- // lot of complexity since an fd can be present in multiple pollalbles. So our
+ // lot of complexity since an fd can be present in multiple pollables. So our
// implementation ONLY DOES (1) and NOT (2).
//
// The cache_fd.salt variable helps here to maintain correctness (it serves as
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 88e015ce22..03b7cedcdb 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -489,7 +489,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
- channelz_channel->RecordCallStarted();
+ channelz_channel->counter_and_tracer()->RecordCallStarted();
}
grpc_slice_unref_internal(path);
@@ -561,7 +561,7 @@ static void destroy_call(void* call, grpc_error* error) {
}
get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, c->final_info.error_string);
+ nullptr, &(c->final_info.error_string));
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -573,6 +573,7 @@ static void destroy_call(void* call, grpc_error* error) {
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
+ gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
}
void grpc_call_ref(grpc_call* c) { gpr_ref(&c->ext_ref); }
@@ -1263,13 +1264,14 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
+ // Record channelz data for the channel.
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
if (*call->final_op.client.status != GRPC_STATUS_OK) {
- channelz_channel->RecordCallFailed();
+ channelz_channel->counter_and_tracer()->RecordCallFailed();
} else {
- channelz_channel->RecordCallSucceeded();
+ channelz_channel->counter_and_tracer()->RecordCallSucceeded();
}
}
GRPC_ERROR_UNREF(error);
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 7cbd61adef..16d3322a9d 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -170,7 +170,7 @@ grpc_channel* grpc_channel_create_with_builder(
bool is_top_level_channel = channel->is_client && !internal_channel;
channel->channelz_channel = channel_node_create_func(
channel, channel_tracer_max_nodes, is_top_level_channel);
- channel->channelz_channel->trace()->AddTraceEvent(
+ channel->channelz_channel->counter_and_tracer()->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
}
@@ -417,6 +417,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_channel != nullptr) {
+ channel->channelz_channel->counter_and_tracer()->trace()->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_channel->MarkChannelDestroyed();
channel->channelz_channel.reset();
}