diff options
author | 2018-09-10 11:12:52 -0700 | |
---|---|---|
committer | 2018-09-10 11:12:52 -0700 | |
commit | 2edfddb66f84841fb39d01e335d19f13a09519cf (patch) | |
tree | 179d269af2150e2566f08f1afcc160c3a796280f /src/core/ext | |
parent | 1a09a5931e41cd44e442775f4f20b9b7349507c6 (diff) | |
parent | 2ff5be8c08c75a2c2c0152694788aa1e5ed3d50d (diff) |
Merge pull request #16055 from ncteisen/channelz-subchannels
Channelz Part 4: Add Subchannel Support
Diffstat (limited to 'src/core/ext')
7 files changed, 244 insertions, 29 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8044b4e4cd..388736b60a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -933,6 +933,11 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata; + grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -994,6 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // // send op data caching @@ -1292,6 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { + maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -1777,23 +1785,22 @@ static void recv_message_ready(void* arg, grpc_error* error) { // recv_trailing_metadata handling // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, +// Sets *status and *server_pushback_md based on md_batch and error. +// Only sets *server_pushback_md if server_pushback_md != nullptr. +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + if (server_pushback_md != nullptr && + md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } @@ -1966,8 +1973,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); @@ -2572,6 +2590,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { } // +// Channelz +// + +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata == nullptr); + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + +// // LB pick // @@ -2600,6 +2681,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + channelz_subchannel->RecordCallStarted(); + } if (parent_data_size > 0) { subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 86c765df52..7e8f59bcd3 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -20,10 +20,13 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include <grpc/support/string_util.h> + namespace grpc_core { namespace channelz { namespace { @@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } +SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, + size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kSubchannel), + subchannel_(subchannel), + target_( + UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), + trace_(channel_tracer_max_nodes) {} + +SubchannelNode::~SubchannelNode() {} + +void SubchannelNode::PopulateConnectivityState(grpc_json* json) { + grpc_connectivity_state state; + if (subchannel_ == nullptr) { + state = GRPC_CHANNEL_SHUTDOWN; + } else { + state = grpc_subchannel_check_connectivity(subchannel_, nullptr); + } + json = grpc_json_create_child(nullptr, json, "state", nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, + false); +} + +grpc_json* SubchannelNode::RenderJson() { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "subchannelId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + PopulateConnectivityState(json); + GPR_ASSERT(target_.get() != nullptr); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 6f27b5c8b7..8ce331e529 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -23,9 +23,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/inlined_vector.h" +typedef struct grpc_subchannel grpc_subchannel; + namespace grpc_core { // TODO(ncteisen), this only contains the uuids of the children for now, @@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode { grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel); - // Override this functionality since client_channels have a notion of - // channel connectivity. - void PopulateConnectivityState(grpc_json* json) override; + ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + virtual ~ClientChannelNode() {} - // Override this functionality since client_channels have subchannels + // Overriding template methods from ChannelNode to render information that + // only ClientChannelNode knows about. + void PopulateConnectivityState(grpc_json* json) override; void PopulateChildRefs(grpc_json* json) override; // Helper to create a channel arg to ensure this type of ChannelNode is // created. static grpc_arg CreateChannelArg(); - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ClientChannelNode() {} - private: grpc_channel_element* client_channel_; }; +// Handles channelz bookkeeping for sockets +class SubchannelNode : public BaseNode { + public: + SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); + ~SubchannelNode() override; + + void MarkSubchannelDestroyed() { + GPR_ASSERT(subchannel_ != nullptr); + subchannel_ = nullptr; + } + + grpc_json* RenderJson() override; + + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } + + private: + grpc_subchannel* subchannel_; + UniquePtr<char> target_; + CallCountingHelper call_counter_; + ChannelTrace trace_; + + void PopulateConnectivityState(grpc_json* json); +}; + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 25b0149393..1ee1925a25 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, grpc_core::channelz::ChannelNode* channel_node = grpc_channel_get_channelz_node(lb_channel_); if (channel_node != nullptr) { - child_channels->push_back(channel_node->channel_uuid()); + child_channels->push_back(channel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 91ddaec8b8..d87de51082 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -200,7 +200,7 @@ class SubchannelList grpc_core::channelz::SubchannelNode* subchannel_node = grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); if (subchannel_node != nullptr) { - refs_list->push_back(subchannel_node->subchannel_uuid()); + refs_list->push_back(subchannel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0e40f42e18..57d0b3759f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); - c->channelz_subchannel.reset(); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); + } gpr_free((void*)c->filters); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + arg = grpc_channel_args_find(c->args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + const grpc_integer_options options = {0, 0, INT_MAX}; + size_t channel_tracer_max_nodes = + (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = - grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( + c, channel_tracer_max_nodes); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); @@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel.get())); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } } +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(channelz_subchannel) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index a135035d62..84febb5204 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel); ~ConnectedSubchannel(); grpc_channel_stack* channel_stack() { return channel_stack_; } @@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + channelz::SubchannelNode* channelz_subchannel() { + return channelz_subchannel_; + } private: grpc_channel_stack* channel_stack_; + // backpointer to the channelz node in this connected subchannel's + // owning subchannel. + channelz::SubchannelNode* channelz_subchannel_; }; } // namespace grpc_core @@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr); +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); + /// Returns the URI string for the address to connect to. const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); |