diff options
27 files changed, 594 insertions, 323 deletions
@@ -75,6 +75,7 @@ EXPORTS grpc_resource_quota_arg_vtable grpc_channelz_get_top_channels grpc_channelz_get_channel + grpc_channelz_get_subchannel grpc_insecure_channel_create_from_fd grpc_server_add_insecure_channel_from_fd grpc_use_signal diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 4e50cd0bac..a8b71ffec2 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -503,6 +503,10 @@ GRPCAPI char* grpc_channelz_get_top_channels(intptr_t start_channel_id); is allocated and must be freed by the application. */ GRPCAPI char* grpc_channelz_get_channel(intptr_t channel_id); +/* Returns a single Subchannel, or else a NOT_FOUND code. The returned string + is allocated and must be freed by the application. */ +GRPCAPI char* grpc_channelz_get_subchannel(intptr_t subchannel_id); + #ifdef __cplusplus } #endif diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d2bf4f388d..d015ceb335 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -934,6 +934,11 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + grpc_transport_stream_op_batch* recv_trailing_metadata_batch; + grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -995,6 +1000,14 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +template <typename Predicate> +static pending_batch* pending_batch_find(grpc_call_element* elem, + const char* log_message, + Predicate predicate); +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, + grpc_mdelem** server_pushback_md); // // send op data caching @@ -1269,6 +1282,66 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_subchannel_call_process_op(subchannel_call, batch); } +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = + calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata + .recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata_batch = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_recv_trailing_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); + calld->recv_trailing_metadata_batch = batch; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); @@ -1293,6 +1366,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { + maybe_intercept_recv_trailing_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -1778,23 +1852,22 @@ static void recv_message_ready(void* arg, grpc_error* error) { // recv_trailing_metadata handling // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, +// Sets *status and *server_pushback_md based on md_batch and error. +// Only sets *server_pushback_md if server_pushback_md != nullptr. +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + if (server_pushback_md != nullptr && + md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } @@ -1967,8 +2040,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); @@ -2601,6 +2685,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + channelz_subchannel->RecordCallStarted(); + } if (parent_data_size > 0) { subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 86c765df52..7e8f59bcd3 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -20,10 +20,13 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include <grpc/support/string_util.h> + namespace grpc_core { namespace channelz { namespace { @@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } +SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, + size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kSubchannel), + subchannel_(subchannel), + target_( + UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), + trace_(channel_tracer_max_nodes) {} + +SubchannelNode::~SubchannelNode() {} + +void SubchannelNode::PopulateConnectivityState(grpc_json* json) { + grpc_connectivity_state state; + if (subchannel_ == nullptr) { + state = GRPC_CHANNEL_SHUTDOWN; + } else { + state = grpc_subchannel_check_connectivity(subchannel_, nullptr); + } + json = grpc_json_create_child(nullptr, json, "state", nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, + false); +} + +grpc_json* SubchannelNode::RenderJson() { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "subchannelId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + PopulateConnectivityState(json); + GPR_ASSERT(target_.get() != nullptr); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 6f27b5c8b7..8ce331e529 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -23,9 +23,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/inlined_vector.h" +typedef struct grpc_subchannel grpc_subchannel; + namespace grpc_core { // TODO(ncteisen), this only contains the uuids of the children for now, @@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode { grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel); - // Override this functionality since client_channels have a notion of - // channel connectivity. - void PopulateConnectivityState(grpc_json* json) override; + ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + virtual ~ClientChannelNode() {} - // Override this functionality since client_channels have subchannels + // Overriding template methods from ChannelNode to render information that + // only ClientChannelNode knows about. + void PopulateConnectivityState(grpc_json* json) override; void PopulateChildRefs(grpc_json* json) override; // Helper to create a channel arg to ensure this type of ChannelNode is // created. static grpc_arg CreateChannelArg(); - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ClientChannelNode() {} - private: grpc_channel_element* client_channel_; }; +// Handles channelz bookkeeping for sockets +class SubchannelNode : public BaseNode { + public: + SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); + ~SubchannelNode() override; + + void MarkSubchannelDestroyed() { + GPR_ASSERT(subchannel_ != nullptr); + subchannel_ = nullptr; + } + + grpc_json* RenderJson() override; + + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } + + private: + grpc_subchannel* subchannel_; + UniquePtr<char> target_; + CallCountingHelper call_counter_; + ChannelTrace trace_; + + void PopulateConnectivityState(grpc_json* json); +}; + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 25b0149393..1ee1925a25 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, grpc_core::channelz::ChannelNode* channel_node = grpc_channel_get_channelz_node(lb_channel_); if (channel_node != nullptr) { - child_channels->push_back(channel_node->channel_uuid()); + child_channels->push_back(channel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 91ddaec8b8..d87de51082 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -200,7 +200,7 @@ class SubchannelList grpc_core::channelz::SubchannelNode* subchannel_node = grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); if (subchannel_node != nullptr) { - refs_list->push_back(subchannel_node->subchannel_uuid()); + refs_list->push_back(subchannel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0e40f42e18..57d0b3759f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); - c->channelz_subchannel.reset(); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); + } gpr_free((void*)c->filters); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + arg = grpc_channel_args_find(c->args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + const grpc_integer_options options = {0, 0, INT_MAX}; + size_t channel_tracer_max_nodes = + (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = - grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( + c, channel_tracer_max_nodes); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); @@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel.get())); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } } +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(channelz_subchannel) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index a135035d62..84febb5204 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel); ~ConnectedSubchannel(); grpc_channel_stack* channel_stack() { return channel_stack_; } @@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + channelz::SubchannelNode* channelz_subchannel() { + return channelz_subchannel_; + } private: grpc_channel_stack* channel_stack_; + // backpointer to the channelz node in this connected subchannel's + // owning subchannel. + channelz::SubchannelNode* channelz_subchannel_; }; } // namespace grpc_core @@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr); +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); + /// Returns the URI string for the address to connect to. const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index b3443310ac..cfb2faba51 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -41,16 +41,14 @@ namespace grpc_core { namespace channelz { -ChannelTrace::TraceEvent::TraceEvent( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type) +ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_entity) : severity_(severity), data_(data), timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME)), next_(nullptr), - referenced_channel_(std::move(referenced_channel)), - referenced_type_(type) {} + referenced_entity_(std::move(referenced_entity)) {} ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) : severity_(severity), @@ -110,23 +108,13 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) { AddTraceEventHelper(New<TraceEvent>(severity, data)); } -void ChannelTrace::AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel) { - if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 - // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>( - severity, data, std::move(referenced_channel), ReferencedType::Channel)); -} - -void ChannelTrace::AddTraceEventReferencingSubchannel( +void ChannelTrace::AddTraceEventWithReference( Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel) { + RefCountedPtr<BaseNode> referenced_entity) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>(severity, data, - std::move(referenced_subchannel), - ReferencedType::Subchannel)); + AddTraceEventHelper( + New<TraceEvent>(severity, data, std::move(referenced_entity))); } namespace { @@ -157,19 +145,18 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { json_iterator = grpc_json_create_child(json_iterator, json, "timestamp", gpr_format_timespec(timestamp_), GRPC_JSON_STRING, true); - if (referenced_channel_ != nullptr) { + if (referenced_entity_ != nullptr) { + const bool is_channel = + (referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel || + referenced_entity_->type() == BaseNode::EntityType::kInternalChannel); char* uuid_str; - gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid()); + gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_entity_->uuid()); grpc_json* child_ref = grpc_json_create_child( - json_iterator, json, - (referenced_type_ == ReferencedType::Channel) ? "channelRef" - : "subchannelRef", + json_iterator, json, is_channel ? "channelRef" : "subchannelRef", nullptr, GRPC_JSON_OBJECT, false); json_iterator = grpc_json_create_child( - nullptr, child_ref, - (referenced_type_ == ReferencedType::Channel) ? "channelId" - : "subchannelId", - uuid_str, GRPC_JSON_STRING, true); + nullptr, child_ref, is_channel ? "channelId" : "subchannelId", uuid_str, + GRPC_JSON_STRING, true); json_iterator = child_ref; } } @@ -178,24 +165,26 @@ grpc_json* ChannelTrace::RenderJson() const { if (!max_list_size_) return nullptr; // tracing is disabled if max_events == 0 grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); - char* num_events_logged_str; - gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_); grpc_json* json_iterator = nullptr; - json_iterator = - grpc_json_create_child(json_iterator, json, "numEventsLogged", - num_events_logged_str, GRPC_JSON_STRING, true); + if (num_events_logged_ > 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "numEventsLogged", num_events_logged_); + } json_iterator = grpc_json_create_child( json_iterator, json, "creationTimestamp", gpr_format_timespec(time_created_), GRPC_JSON_STRING, true); - grpc_json* events = grpc_json_create_child(json_iterator, json, "events", - nullptr, GRPC_JSON_ARRAY, false); - json_iterator = nullptr; - TraceEvent* it = head_trace_; - while (it != nullptr) { - json_iterator = grpc_json_create_child(json_iterator, events, nullptr, - nullptr, GRPC_JSON_OBJECT, false); - it->RenderTraceEvent(json_iterator); - it = it->next(); + // only add in the event list if it is non-empty. + if (num_events_logged_ > 0) { + grpc_json* events = grpc_json_create_child(json_iterator, json, "events", + nullptr, GRPC_JSON_ARRAY, false); + json_iterator = nullptr; + TraceEvent* it = head_trace_; + while (it != nullptr) { + json_iterator = grpc_json_create_child(json_iterator, events, nullptr, + nullptr, GRPC_JSON_OBJECT, false); + it->RenderTraceEvent(json_iterator); + it = it->next(); + } } return json; } diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h index 596af7402f..230faa483e 100644 --- a/src/core/lib/channel/channel_trace.h +++ b/src/core/lib/channel/channel_trace.h @@ -30,7 +30,7 @@ namespace grpc_core { namespace channelz { -class ChannelNode; +class BaseNode; // Object used to hold live data for a channel. This data is exposed via the // channelz service: @@ -55,35 +55,28 @@ class ChannelTrace { void AddTraceEvent(Severity severity, grpc_slice data); // Adds a new trace event to the tracing object. This trace event refers to a - // an event on a child of the channel. For example, if this channel has - // created a new subchannel, then it would record that with a TraceEvent - // referencing the new subchannel. + // an event that concerns a different channelz entity. For example, if this + // channel has created a new subchannel, then it would record that with + // a TraceEvent referencing the new subchannel. // // TODO(ncteisen): as this call is used more and more throughout the gRPC // stack, determine if it makes more sense to accept a char* instead of a // slice. - void AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel); - void AddTraceEventReferencingSubchannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel); + void AddTraceEventWithReference(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel); // Creates and returns the raw grpc_json object, so a parent channelz // object may incorporate the json before rendering. grpc_json* RenderJson() const; private: - // Types of objects that can be references by trace events. - enum class ReferencedType { Channel, Subchannel }; // Private class to encapsulate all the data and bookkeeping needed for a // a trace event. class TraceEvent { public: - // Constructor for a TraceEvent that references a different channel. + // Constructor for a TraceEvent that references a channel. TraceEvent(Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, - ReferencedType type); + RefCountedPtr<BaseNode> referenced_entity_); // Constructor for a TraceEvent that does not reverence a different // channel. @@ -105,10 +98,7 @@ class ChannelTrace { gpr_timespec timestamp_; TraceEvent* next_; // the tracer object for the (sub)channel that this trace event refers to. - RefCountedPtr<ChannelNode> referenced_channel_; - // the type that the referenced tracer points to. Unused if this trace - // does not point to any channel or subchannel - ReferencedType referenced_type_; + RefCountedPtr<BaseNode> referenced_entity_; }; // TraceEvent // Internal helper to add and link in a trace event diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 9d6002ed8a..9f54850002 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -41,33 +41,61 @@ namespace grpc_core { namespace channelz { -ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel) - : channel_(channel), - target_(nullptr), - channel_uuid_(-1), - is_top_level_channel_(is_top_level_channel) { - trace_.Init(channel_tracer_max_nodes); - target_ = UniquePtr<char>(grpc_channel_get_target(channel_)); - channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this); +BaseNode::BaseNode(EntityType type) + : type_(type), uuid_(ChannelzRegistry::Register(this)) {} + +BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); } + +char* BaseNode::RenderJsonString() { + grpc_json* json = RenderJson(); + char* json_str = grpc_json_dump_to_string(json, 0); + grpc_json_destroy(json); + return json_str; +} + +CallCountingHelper::CallCountingHelper() { gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -ChannelNode::~ChannelNode() { - trace_.Destroy(); - ChannelzRegistry::UnregisterChannelNode(channel_uuid_); -} +CallCountingHelper::~CallCountingHelper() {} -void ChannelNode::RecordCallStarted() { +void CallCountingHelper::RecordCallStarted() { gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -void ChannelNode::PopulateConnectivityState(grpc_json* json) {} +void CallCountingHelper::PopulateCallCounts(grpc_json* json) { + grpc_json* json_iterator = nullptr; + if (calls_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsStarted", calls_started_); + } + if (calls_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsSucceeded", calls_succeeded_); + } + if (calls_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsFailed", calls_failed_); + } + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); +} -void ChannelNode::PopulateChildRefs(grpc_json* json) {} +ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) + : BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel + : EntityType::kInternalChannel), + channel_(channel), + target_(UniquePtr<char>(grpc_channel_get_target(channel_))), + trace_(channel_tracer_max_nodes) {} + +ChannelNode::~ChannelNode() {} grpc_json* ChannelNode::RenderJson() { // We need to track these three json objects to build our object @@ -80,7 +108,7 @@ grpc_json* ChannelNode::RenderJson() { json = json_iterator; json_iterator = nullptr; json_iterator = grpc_json_add_number_string_child(json, json_iterator, - "channelId", channel_uuid_); + "channelId", uuid()); // reset json iterators to top level object json = top_level_json; json_iterator = nullptr; @@ -89,51 +117,28 @@ grpc_json* ChannelNode::RenderJson() { GRPC_JSON_OBJECT, false); json = data; json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateConnectivityState(json); + // populate the target. GPR_ASSERT(target_.get() != nullptr); - json_iterator = grpc_json_create_child( - json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); // fill in the channel trace if applicable - grpc_json* trace = trace_->RenderJson(); - if (trace != nullptr) { - // we manually link up and fill the child since it was created for us in - // ChannelTrace::RenderJson - trace->key = "trace"; // this object is named trace in channelz.proto - json_iterator = grpc_json_link_child(json, trace, json_iterator); - } - // reset the parent to be the data object. - json = data; - json_iterator = nullptr; - if (calls_started_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsStarted", calls_started_); - } - if (calls_succeeded_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsSucceeded", calls_succeeded_); - } - if (calls_failed_) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsFailed", calls_failed_); + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); } - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); - json_iterator = - grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", - gpr_format_timespec(ts), GRPC_JSON_STRING, true); + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); json = top_level_json; - json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateChildRefs(json); return top_level_json; } -char* ChannelNode::RenderJsonString() { - grpc_json* json = RenderJson(); - char* json_str = grpc_json_dump_to_string(json, 0); - grpc_json_destroy(json); - return json_str; -} - RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel) { @@ -141,13 +146,5 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( channel, channel_tracer_max_nodes, is_top_level_channel); } -SubchannelNode::SubchannelNode() { - subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this); -} - -SubchannelNode::~SubchannelNode() { - ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_); -} - } // namespace channelz } // namespace grpc_core diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 07eb73d626..bd2735929c 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -43,14 +43,52 @@ namespace grpc_core { namespace channelz { namespace testing { +class CallCountingHelperPeer; class ChannelNodePeer; -} +} // namespace testing -class ChannelNode : public RefCounted<ChannelNode> { +// base class for all channelz entities +class BaseNode : public RefCounted<BaseNode> { public: - static RefCountedPtr<ChannelNode> MakeChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); + // There are only four high level channelz entities. However, to support + // GetTopChannelsRequest, we split the Channel entity into two different + // types. All children of BaseNode must be one of these types. + enum class EntityType { + kTopLevelChannel, + kInternalChannel, + kSubchannel, + kServer, + kSocket, + }; + + explicit BaseNode(EntityType type); + virtual ~BaseNode(); + + // All children must implement this function. + virtual grpc_json* RenderJson() GRPC_ABSTRACT; + + // Renders the json and returns allocated string that must be freed by the + // caller. + char* RenderJsonString(); + + EntityType type() const { return type_; } + intptr_t uuid() const { return uuid_; } + + private: + const EntityType type_; + const intptr_t uuid_; +}; + +// This class is a helper class for channelz entities that deal with Channels +// Subchannels, and Servers, since those have similar proto definitions. +// This class has the ability to: +// - track calls_{started,succeeded,failed} +// - track last_call_started_timestamp +// - perform rendering of the above items +class CallCountingHelper { + public: + CallCountingHelper(); + ~CallCountingHelper(); void RecordCallStarted(); void RecordCallFailed() { @@ -60,17 +98,43 @@ class ChannelNode : public RefCounted<ChannelNode> { gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); } - grpc_json* RenderJson(); - char* RenderJsonString(); + // Common rendering of the call count data and last_call_started_timestamp. + void PopulateCallCounts(grpc_json* json); - // helper for getting and populating connectivity state. It is virtual - // because it allows the client_channel specific code to live in ext/ - // instead of lib/ - virtual void PopulateConnectivityState(grpc_json* json); + private: + // testing peer friend. + friend class testing::CallCountingHelperPeer; - virtual void PopulateChildRefs(grpc_json* json); + gpr_atm calls_started_ = 0; + gpr_atm calls_succeeded_ = 0; + gpr_atm calls_failed_ = 0; + gpr_atm last_call_started_millis_ = 0; +}; - ChannelTrace* trace() { return trace_.get(); } +// Handles channelz bookkeeping for channels +class ChannelNode : public BaseNode { + public: + static RefCountedPtr<ChannelNode> MakeChannelNode( + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + + ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + ~ChannelNode() override; + + grpc_json* RenderJson() override; + + // template methods. RenderJSON uses these methods to render its JSON + // representation. These are virtual so that children classes may provide + // their specific mechanism for populating these parts of the channelz + // object. + // + // ChannelNode does not have a notion of connectivity state or child refs, + // so it leaves these implementations blank. + // + // This is utilizing the template method design pattern. + virtual void PopulateConnectivityState(grpc_json* json) {} + virtual void PopulateChildRefs(grpc_json* json) {} void MarkChannelDestroyed() { GPR_ASSERT(channel_ != nullptr); @@ -79,47 +143,44 @@ class ChannelNode : public RefCounted<ChannelNode> { bool ChannelIsDestroyed() { return channel_ == nullptr; } - intptr_t channel_uuid() { return channel_uuid_; } - bool is_top_level_channel() { return is_top_level_channel_; } - - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ChannelNode(); + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - // testing peer friend. + // to allow the channel trace test to access trace(); friend class testing::ChannelNodePeer; - grpc_channel* channel_ = nullptr; UniquePtr<char> target_; - gpr_atm calls_started_ = 0; - gpr_atm calls_succeeded_ = 0; - gpr_atm calls_failed_ = 0; - gpr_atm last_call_started_millis_ = 0; - intptr_t channel_uuid_; - bool is_top_level_channel_ = true; - ManualConstructor<ChannelTrace> trace_; + CallCountingHelper call_counter_; + ChannelTrace trace_; }; -// Placeholds channelz class for subchannels. All this can do now is track its -// uuid (this information is needed by the parent channelz class). -// TODO(ncteisen): build this out to support the GetSubchannel channelz request. -class SubchannelNode : public RefCounted<SubchannelNode> { +// Handles channelz bookkeeping for servers +// TODO(ncteisen): implement in subsequent PR. +class ServerNode : public BaseNode { public: - SubchannelNode(); - virtual ~SubchannelNode(); - - intptr_t subchannel_uuid() { return subchannel_uuid_; } - - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW + explicit ServerNode(size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kServer) {} + ~ServerNode() override {} +}; - private: - intptr_t subchannel_uuid_; +// Handles channelz bookkeeping for sockets +// TODO(ncteisen): implement in subsequent PR. +class SocketNode : public BaseNode { + public: + SocketNode() : BaseNode(EntityType::kSocket) {} + ~SocketNode() override {} }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index f79d2f0c17..972c61c1d6 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -53,54 +53,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); } ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); } -intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) { +intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) { MutexLock lock(&mu_); - entities_.push_back(entry); + entities_.push_back(node); intptr_t uuid = entities_.size(); return uuid; } -void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) { +void ChannelzRegistry::InternalUnregister(intptr_t uuid) { GPR_ASSERT(uuid >= 1); MutexLock lock(&mu_); GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size()); - GPR_ASSERT(entities_[uuid - 1].type == type); - entities_[uuid - 1].object = nullptr; - entities_[uuid - 1].type = EntityType::kUnset; + entities_[uuid - 1] = nullptr; } -void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) { +BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) { MutexLock lock(&mu_); if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) { return nullptr; } - if (entities_[uuid - 1].type == type) { - return entities_[uuid - 1].object; - } else { - return nullptr; - } + return entities_[uuid - 1]; } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; - InlinedVector<ChannelNode*, 10> top_level_channels; + InlinedVector<BaseNode*, 10> top_level_channels; // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is // reserved). However, we want to support requests coming in with // start_channel_id=0, which signifies "give me everything." Hence this // funky looking line below. size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; for (size_t i = start_idx; i < entities_.size(); ++i) { - if (entities_[i].type == EntityType::kChannelNode) { - ChannelNode* channel_node = - static_cast<ChannelNode*>(entities_[i].object); - if (channel_node->is_top_level_channel()) { - top_level_channels.push_back(channel_node); - } + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) { + top_level_channels.push_back(entities_[i]); } } - if (top_level_channels.size() > 0) { + if (!top_level_channels.empty()) { // create list of channels grpc_json* array_parent = grpc_json_create_child( nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false); @@ -129,9 +121,13 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) { } char* grpc_channelz_get_channel(intptr_t channel_id) { - grpc_core::channelz::ChannelNode* channel_node = - grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id); - if (channel_node == nullptr) { + grpc_core::channelz::BaseNode* channel_node = + grpc_core::channelz::ChannelzRegistry::Get(channel_id); + if (channel_node == nullptr || + (channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel && + channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) { return nullptr; } grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); @@ -143,3 +139,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) { grpc_json_destroy(top_level_json); return json_str; } + +char* grpc_channelz_get_subchannel(intptr_t subchannel_id) { + grpc_core::channelz::BaseNode* subchannel_node = + grpc_core::channelz::ChannelzRegistry::Get(subchannel_id); + if (subchannel_node == nullptr || + subchannel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kSubchannel) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* subchannel_json = subchannel_node->RenderJson(); + subchannel_json->key = "subchannel"; + grpc_json_link_child(json, subchannel_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index 5d7c936726..142c039220 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -40,32 +40,11 @@ class ChannelzRegistry { // To be called in grpc_shutdown(); static void Shutdown(); - // Register/Unregister/Get for ChannelNode - static intptr_t RegisterChannelNode(ChannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kChannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterChannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode); - } - static ChannelNode* GetChannelNode(intptr_t uuid) { - void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode); - return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten); - } - - // Register/Unregister/Get for SubchannelNode - static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kSubchannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterSubchannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode); - } - static SubchannelNode* GetSubchannelNode(intptr_t uuid) { - void* gotten = - Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode); - return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten); + static intptr_t Register(BaseNode* node) { + return Default()->InternalRegister(node); } + static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); } + static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); } // Returns the allocated JSON string that represents the proto // GetTopChannelsResponse as per channelz.proto. @@ -74,19 +53,6 @@ class ChannelzRegistry { } private: - enum class EntityType { - kChannelNode, - kSubchannelNode, - kUnset, - }; - - struct RegistryEntry { - RegistryEntry(void* object_in, EntityType type_in) - : object(object_in), type(type_in) {} - void* object; - EntityType type; - }; - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE @@ -97,21 +63,21 @@ class ChannelzRegistry { static ChannelzRegistry* Default(); // globally registers an Entry. Returns its unique uuid - intptr_t InternalRegisterEntry(const RegistryEntry& entry); + intptr_t InternalRegister(BaseNode* node); // globally unregisters the object that is associated to uuid. Also does // sanity check that an object doesn't try to unregister the wrong type. - void InternalUnregisterEntry(intptr_t uuid, EntityType type); + void InternalUnregister(intptr_t uuid); // if object with uuid has previously been registered as the correct type, // returns the void* associated with that uuid. Else returns nullptr. - void* InternalGetEntry(intptr_t uuid, EntityType type); + BaseNode* InternalGet(intptr_t uuid); char* InternalGetTopChannels(intptr_t start_channel_id); // protects entities_ and uuid_ gpr_mu mu_; - InlinedVector<RegistryEntry, 20> entities_; + InlinedVector<BaseNode*, 20> entities_; }; } // namespace channelz diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index e2ea334ded..90a0254663 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch( if (batch->recv_initial_metadata) { callback_state* state = &calld->recv_initial_metadata_ready; intercept_callback( - calld, state, false, "recv_initial_metadata_ready", + calld, state, false, "connected_recv_initial_metadata_ready", &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); } if (batch->recv_message) { callback_state* state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "recv_message_ready", + intercept_callback(calld, state, false, "connected_recv_message_ready", &batch->payload->recv_message.recv_message_ready); } if (batch->recv_trailing_metadata) { callback_state* state = &calld->recv_trailing_metadata_ready; intercept_callback( - calld, state, false, "recv_trailing_metadata_ready", + calld, state, false, "connected_recv_trailing_metadata_ready", &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); } if (batch->cancel_stream) { @@ -126,11 +126,13 @@ static void con_start_transport_stream_op_batch( // closure for each one. callback_state* state = static_cast<callback_state*>(gpr_malloc(sizeof(*state))); - intercept_callback(calld, state, true, "on_complete (cancel_stream)", + intercept_callback(calld, state, true, + "connected_on_complete (cancel_stream)", &batch->on_complete); } else if (batch->on_complete != nullptr) { callback_state* state = get_state_for_batch(calld, batch); - intercept_callback(calld, state, false, "on_complete", &batch->on_complete); + intercept_callback(calld, state, false, "connected_on_complete", + &batch->on_complete); } grpc_transport_perform_stream_op( chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 2923a86646..3d69db4f83 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1267,6 +1267,7 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } + // Record channelz data for the channel. grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (channelz_channel != nullptr) { @@ -1424,7 +1425,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready"); receiving_stream_ready(bctlp, error); } @@ -1509,7 +1510,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, + "call_recv_initial_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { @@ -1560,7 +1562,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, + "call_recv_trailing_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1571,7 +1574,7 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_on_complete"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); finish_batch_step(bctl); } diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 82635d3c21..10d90e1406 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -170,7 +170,7 @@ grpc_channel* grpc_channel_create_with_builder( bool is_top_level_channel = channel->is_client && !internal_channel; channel->channelz_channel = channel_node_create_func( channel, channel_tracer_max_nodes, is_top_level_channel); - channel->channelz_channel->trace()->AddTraceEvent( + channel->channelz_channel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); } @@ -428,6 +428,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) { static void destroy_channel(void* arg, grpc_error* error) { grpc_channel* channel = static_cast<grpc_channel*>(arg); if (channel->channelz_channel != nullptr) { + channel->channelz_channel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel destroyed")); channel->channelz_channel->MarkChannelDestroyed(); channel->channelz_channel.reset(); } diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index f44f89dfa0..25ba7aa275 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -98,6 +98,7 @@ grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_imp grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; grpc_channelz_get_channel_type grpc_channelz_get_channel_import; +grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; grpc_use_signal_type grpc_use_signal_import; @@ -352,6 +353,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable"); grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels"); grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel"); + grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 99433f0e40..30fcb79407 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -269,6 +269,9 @@ extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id); extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import; #define grpc_channelz_get_channel grpc_channelz_get_channel_import +typedef char*(*grpc_channelz_get_subchannel_type)(intptr_t subchannel_id); +extern grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; +#define grpc_channelz_get_subchannel grpc_channelz_get_subchannel_import typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args); extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index 2f5329a96d..6f0bfa06d2 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -124,7 +124,7 @@ static void test_create_channel_stack(void) { gpr_now(GPR_CLOCK_MONOTONIC), /* start_time */ GRPC_MILLIS_INF_FUTURE, /* deadline */ nullptr, /* arena */ - nullptr /* call_combiner */ + nullptr, /* call_combiner */ }; grpc_error* error = grpc_call_stack_init(channel_stack, 1, free_call, call_stack, &args); diff --git a/test/core/channel/channel_trace_test.cc b/test/core/channel/channel_trace_test.cc index f224457a55..e33277753b 100644 --- a/test/core/channel/channel_trace_test.cc +++ b/test/core/channel/channel_trace_test.cc @@ -40,6 +40,17 @@ namespace grpc_core { namespace channelz { namespace testing { + +// testing peer to access channel internals +class ChannelNodePeer { + public: + ChannelNodePeer(ChannelNode* node) : node_(node) {} + ChannelTrace* trace() { return &node_->trace_; } + + private: + ChannelNode* node_; +}; + namespace { grpc_json* GetJsonChild(grpc_json* parent, const char* key) { @@ -156,28 +167,29 @@ TEST_P(ChannelTracerTest, ComplexTest) { ChannelFixture channel1(GetParam()); RefCountedPtr<ChannelNode> sc1 = MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true); - tracer.AddTraceEventReferencingSubchannel( + ChannelNodePeer sc1_peer(sc1.get()); + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Info, grpc_slice_from_static_string("subchannel one created"), sc1); ValidateChannelTrace(&tracer, 3, GetParam()); - AddSimpleTrace(sc1->trace()); - AddSimpleTrace(sc1->trace()); - AddSimpleTrace(sc1->trace()); - ValidateChannelTrace(sc1->trace(), 3, GetParam()); - AddSimpleTrace(sc1->trace()); - AddSimpleTrace(sc1->trace()); - AddSimpleTrace(sc1->trace()); - ValidateChannelTrace(sc1->trace(), 6, GetParam()); + AddSimpleTrace(sc1_peer.trace()); + AddSimpleTrace(sc1_peer.trace()); + AddSimpleTrace(sc1_peer.trace()); + ValidateChannelTrace(sc1_peer.trace(), 3, GetParam()); + AddSimpleTrace(sc1_peer.trace()); + AddSimpleTrace(sc1_peer.trace()); + AddSimpleTrace(sc1_peer.trace()); + ValidateChannelTrace(sc1_peer.trace(), 6, GetParam()); AddSimpleTrace(&tracer); AddSimpleTrace(&tracer); ValidateChannelTrace(&tracer, 5, GetParam()); ChannelFixture channel2(GetParam()); RefCountedPtr<ChannelNode> sc2 = MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true); - tracer.AddTraceEventReferencingChannel( + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Info, grpc_slice_from_static_string("LB channel two created"), sc2); - tracer.AddTraceEventReferencingSubchannel( + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Warning, grpc_slice_from_static_string("subchannel one inactive"), sc1); ValidateChannelTrace(&tracer, 7, GetParam()); @@ -203,33 +215,35 @@ TEST_P(ChannelTracerTest, TestNesting) { ChannelFixture channel1(GetParam()); RefCountedPtr<ChannelNode> sc1 = MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true); - tracer.AddTraceEventReferencingChannel( + ChannelNodePeer sc1_peer(sc1.get()); + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Info, grpc_slice_from_static_string("subchannel one created"), sc1); ValidateChannelTrace(&tracer, 3, GetParam()); - AddSimpleTrace(sc1->trace()); + AddSimpleTrace(sc1_peer.trace()); ChannelFixture channel2(GetParam()); RefCountedPtr<ChannelNode> conn1 = MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true); + ChannelNodePeer conn1_peer(conn1.get()); // nesting one level deeper. - sc1->trace()->AddTraceEventReferencingSubchannel( + sc1_peer.trace()->AddTraceEventWithReference( ChannelTrace::Severity::Info, grpc_slice_from_static_string("connection one created"), conn1); ValidateChannelTrace(&tracer, 3, GetParam()); - AddSimpleTrace(conn1->trace()); + AddSimpleTrace(conn1_peer.trace()); AddSimpleTrace(&tracer); AddSimpleTrace(&tracer); ValidateChannelTrace(&tracer, 5, GetParam()); - ValidateChannelTrace(conn1->trace(), 1, GetParam()); + ValidateChannelTrace(conn1_peer.trace(), 1, GetParam()); ChannelFixture channel3(GetParam()); RefCountedPtr<ChannelNode> sc2 = MakeRefCounted<ChannelNode>(channel3.channel(), GetParam(), true); - tracer.AddTraceEventReferencingSubchannel( + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Info, grpc_slice_from_static_string("subchannel two created"), sc2); // this trace should not get added to the parents children since it is already // present in the tracer. - tracer.AddTraceEventReferencingChannel( + tracer.AddTraceEventWithReference( ChannelTrace::Severity::Warning, grpc_slice_from_static_string("subchannel one inactive"), sc1); AddSimpleTrace(&tracer); diff --git a/test/core/channel/channelz_registry_test.cc b/test/core/channel/channelz_registry_test.cc index 581e867584..c02d525c81 100644 --- a/test/core/channel/channelz_registry_test.cc +++ b/test/core/channel/channelz_registry_test.cc @@ -44,22 +44,22 @@ namespace channelz { namespace testing { TEST(ChannelzRegistryTest, UuidStartsAboveZeroTest) { - ChannelNode* channelz_channel = nullptr; - intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel); + BaseNode* channelz_channel = nullptr; + intptr_t uuid = ChannelzRegistry::Register(channelz_channel); EXPECT_GT(uuid, 0) << "First uuid chose must be greater than zero. Zero if " "reserved according to " "https://github.com/grpc/proposal/blob/master/" "A14-channelz.md"; - ChannelzRegistry::UnregisterChannelNode(uuid); + ChannelzRegistry::Unregister(uuid); } TEST(ChannelzRegistryTest, UuidsAreIncreasing) { - ChannelNode* channelz_channel = nullptr; + BaseNode* channelz_channel = nullptr; std::vector<intptr_t> uuids; uuids.reserve(10); for (int i = 0; i < 10; ++i) { // reregister the same object. It's ok since we are just testing uuids - uuids.push_back(ChannelzRegistry::RegisterChannelNode(channelz_channel)); + uuids.push_back(ChannelzRegistry::Register(channelz_channel)); } for (size_t i = 1; i < uuids.size(); ++i) { EXPECT_LT(uuids[i - 1], uuids[i]) << "Uuids must always be increasing"; @@ -68,30 +68,30 @@ TEST(ChannelzRegistryTest, UuidsAreIncreasing) { TEST(ChannelzRegistryTest, RegisterGetTest) { // we hackily jam an intptr_t into this pointer to check for equality later - ChannelNode* channelz_channel = (ChannelNode*)42; - intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel); - ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid); + BaseNode* channelz_channel = (BaseNode*)42; + intptr_t uuid = ChannelzRegistry::Register(channelz_channel); + BaseNode* retrieved = ChannelzRegistry::Get(uuid); EXPECT_EQ(channelz_channel, retrieved); } TEST(ChannelzRegistryTest, RegisterManyItems) { // we hackily jam an intptr_t into this pointer to check for equality later - ChannelNode* channelz_channel = (ChannelNode*)42; + BaseNode* channelz_channel = (BaseNode*)42; for (int i = 0; i < 100; i++) { - intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel); - ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid); + intptr_t uuid = ChannelzRegistry::Register(channelz_channel); + BaseNode* retrieved = ChannelzRegistry::Get(uuid); EXPECT_EQ(channelz_channel, retrieved); } } TEST(ChannelzRegistryTest, NullIfNotPresentTest) { // we hackily jam an intptr_t into this pointer to check for equality later - ChannelNode* channelz_channel = (ChannelNode*)42; - intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel); + BaseNode* channelz_channel = (BaseNode*)42; + intptr_t uuid = ChannelzRegistry::Register(channelz_channel); // try to pull out a uuid that does not exist. - ChannelNode* nonexistant = ChannelzRegistry::GetChannelNode(uuid + 1); + BaseNode* nonexistant = ChannelzRegistry::Get(uuid + 1); EXPECT_EQ(nonexistant, nullptr); - ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid); + BaseNode* retrieved = ChannelzRegistry::Get(uuid); EXPECT_EQ(channelz_channel, retrieved); } diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index ad5f86d934..8fa46a18da 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -44,16 +44,16 @@ namespace channelz { namespace testing { // testing peer to access channel internals -class ChannelNodePeer { +class CallCountingHelperPeer { public: - ChannelNodePeer(ChannelNode* channel) : channel_(channel) {} + CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} grpc_millis last_call_started_millis() { return (grpc_millis)gpr_atm_no_barrier_load( - &channel_->last_call_started_millis_); + &node_->last_call_started_millis_); } private: - ChannelNode* channel_; + CallCountingHelper* node_; }; namespace { @@ -157,14 +157,14 @@ void ValidateChannel(ChannelNode* channel, validate_channel_data_args args) { ValidateCounters(json_str, args); gpr_free(json_str); // also check that the core API formats this the correct way - char* core_api_json_str = grpc_channelz_get_channel(channel->channel_uuid()); + char* core_api_json_str = grpc_channelz_get_channel(channel->uuid()); grpc::testing::ValidateGetChannelResponseProtoJsonTranslation( core_api_json_str); gpr_free(core_api_json_str); } -grpc_millis GetLastCallStartedMillis(ChannelNode* channel) { - ChannelNodePeer peer(channel); +grpc_millis GetLastCallStartedMillis(CallCountingHelper* channel) { + CallCountingHelperPeer peer(channel); return peer.last_call_started_millis(); } @@ -215,27 +215,25 @@ TEST_P(ChannelzChannelTest, BasicChannelAPIFunctionality) { TEST_P(ChannelzChannelTest, LastCallStartedMillis) { grpc_core::ExecCtx exec_ctx; - ChannelFixture channel(GetParam()); - ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(channel.channel()); + CallCountingHelper counter; // start a call to set the last call started timestamp - channelz_channel->RecordCallStarted(); - grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel); + counter.RecordCallStarted(); + grpc_millis millis1 = GetLastCallStartedMillis(&counter); // time gone by should not affect the timestamp ChannelzSleep(100); - grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel); + grpc_millis millis2 = GetLastCallStartedMillis(&counter); EXPECT_EQ(millis1, millis2); // calls succeeded or failed should not affect the timestamp ChannelzSleep(100); - channelz_channel->RecordCallFailed(); - channelz_channel->RecordCallSucceeded(); - grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel); + counter.RecordCallFailed(); + counter.RecordCallSucceeded(); + grpc_millis millis3 = GetLastCallStartedMillis(&counter); EXPECT_EQ(millis1, millis3); // another call started should affect the timestamp // sleep for extra long to avoid flakes (since we cache Now()) ChannelzSleep(5000); - channelz_channel->RecordCallStarted(); - grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel); + counter.RecordCallStarted(); + grpc_millis millis4 = GetLastCallStartedMillis(&counter); EXPECT_NE(millis1, millis4); } diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index b832a1661b..a4d2a5bf62 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -137,6 +137,7 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable); printf("%lx", (unsigned long) grpc_channelz_get_top_channels); printf("%lx", (unsigned long) grpc_channelz_get_channel); + printf("%lx", (unsigned long) grpc_channelz_get_subchannel); printf("%lx", (unsigned long) grpc_auth_property_iterator_next); printf("%lx", (unsigned long) grpc_auth_context_property_iterator); printf("%lx", (unsigned long) grpc_auth_context_peer_identity); diff --git a/test/cpp/util/channel_trace_proto_helper.cc b/test/cpp/util/channel_trace_proto_helper.cc index b4704bfe6a..e416a0375f 100644 --- a/test/cpp/util/channel_trace_proto_helper.cc +++ b/test/cpp/util/channel_trace_proto_helper.cc @@ -82,5 +82,9 @@ void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str) { json_c_str); } +void ValidateSubchannelProtoJsonTranslation(char* json_c_str) { + VaidateProtoJsonTranslation<grpc::channelz::v1::Subchannel>(json_c_str); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/util/channel_trace_proto_helper.h b/test/cpp/util/channel_trace_proto_helper.h index 18e3d54b6b..a1ebbcd110 100644 --- a/test/cpp/util/channel_trace_proto_helper.h +++ b/test/cpp/util/channel_trace_proto_helper.h @@ -26,6 +26,7 @@ void ValidateChannelTraceProtoJsonTranslation(char* json_c_str); void ValidateChannelProtoJsonTranslation(char* json_c_str); void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str); void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str); +void ValidateSubchannelProtoJsonTranslation(char* json_c_str); } // namespace testing } // namespace grpc |