diff options
author | 2018-09-12 17:10:09 -0700 | |
---|---|---|
committer | 2018-09-12 17:10:09 -0700 | |
commit | 18a6a012f2abe06bb90ecffc3fb9d46eb30e1f86 (patch) | |
tree | c653d07d4d8158ed651fcca6e5e3c36a973934a3 /src/core | |
parent | 86530acca735b6ba49dee12bfc7e80d86d49c0cb (diff) | |
parent | 20345b21bb00c626ef346c26dbb3903c96c21c52 (diff) |
Merge branch 'master' into linuxversion
Diffstat (limited to 'src/core')
75 files changed, 2133 insertions, 883 deletions
diff --git a/src/core/ext/filters/client_channel/README.md b/src/core/ext/filters/client_channel/README.md index 7c209db12e..9676a4535b 100644 --- a/src/core/ext/filters/client_channel/README.md +++ b/src/core/ext/filters/client_channel/README.md @@ -46,20 +46,4 @@ construction arguments for concrete grpc_subchannel instances. Naming for GRPC =============== -Names in GRPC are represented by a URI (as defined in -[RFC 3986](https://tools.ietf.org/html/rfc3986)). - -The following schemes are currently supported: - -dns:///host:port - dns schemes are currently supported so long as authority is - empty (authority based dns resolution is expected in a future - release) - -unix:path - the unix scheme is used to create and connect to unix domain - sockets - the authority must be empty, and the path - represents the absolute or relative path to the desired - socket - -ipv4:host:port - a pre-resolved ipv4 dotted decimal address/port combination - -ipv6:[host]:port - a pre-resolved ipv6 address/port combination +See [/doc/naming.md](gRPC name resolution). diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d2bf4f388d..388736b60a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -457,7 +457,6 @@ get_service_config_from_resolver_result_locked(channel_data* chand) { grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; service_config->ParseGlobalParams(parse_retry_throttle_params, @@ -934,6 +933,11 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata; + grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -995,6 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // // send op data caching @@ -1293,6 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { + maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -1778,23 +1785,22 @@ static void recv_message_ready(void* arg, grpc_error* error) { // recv_trailing_metadata handling // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, +// Sets *status and *server_pushback_md based on md_batch and error. +// Only sets *server_pushback_md if server_pushback_md != nullptr. +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + if (server_pushback_md != nullptr && + md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } @@ -1967,8 +1973,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); @@ -2573,6 +2590,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { } // +// Channelz +// + +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata == nullptr); + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + +// // LB pick // @@ -2601,6 +2681,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + channelz_subchannel->RecordCallStarted(); + } if (parent_data_size > 0) { subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 86c765df52..7e8f59bcd3 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -20,10 +20,13 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include <grpc/support/string_util.h> + namespace grpc_core { namespace channelz { namespace { @@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } +SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, + size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kSubchannel), + subchannel_(subchannel), + target_( + UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), + trace_(channel_tracer_max_nodes) {} + +SubchannelNode::~SubchannelNode() {} + +void SubchannelNode::PopulateConnectivityState(grpc_json* json) { + grpc_connectivity_state state; + if (subchannel_ == nullptr) { + state = GRPC_CHANNEL_SHUTDOWN; + } else { + state = grpc_subchannel_check_connectivity(subchannel_, nullptr); + } + json = grpc_json_create_child(nullptr, json, "state", nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, + false); +} + +grpc_json* SubchannelNode::RenderJson() { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "subchannelId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + PopulateConnectivityState(json); + GPR_ASSERT(target_.get() != nullptr); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 6f27b5c8b7..8ce331e529 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -23,9 +23,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/inlined_vector.h" +typedef struct grpc_subchannel grpc_subchannel; + namespace grpc_core { // TODO(ncteisen), this only contains the uuids of the children for now, @@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode { grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel); - // Override this functionality since client_channels have a notion of - // channel connectivity. - void PopulateConnectivityState(grpc_json* json) override; + ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + virtual ~ClientChannelNode() {} - // Override this functionality since client_channels have subchannels + // Overriding template methods from ChannelNode to render information that + // only ClientChannelNode knows about. + void PopulateConnectivityState(grpc_json* json) override; void PopulateChildRefs(grpc_json* json) override; // Helper to create a channel arg to ensure this type of ChannelNode is // created. static grpc_arg CreateChannelArg(); - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ClientChannelNode() {} - private: grpc_channel_element* client_channel_; }; +// Handles channelz bookkeeping for sockets +class SubchannelNode : public BaseNode { + public: + SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); + ~SubchannelNode() override; + + void MarkSubchannelDestroyed() { + GPR_ASSERT(subchannel_ != nullptr); + subchannel_ = nullptr; + } + + grpc_json* RenderJson() override; + + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } + + private: + grpc_subchannel* subchannel_; + UniquePtr<char> target_; + CallCountingHelper call_counter_; + ChannelTrace trace_; + + void PopulateConnectivityState(grpc_json* json); +}; + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 4e8b8b71db..7ce8da8c00 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake( // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); grpc_endpoint_write(args->endpoint, &handshaker->write_buffer, - &handshaker->request_done_closure); + &handshaker->request_done_closure, nullptr); gpr_mu_unlock(&handshaker->mu); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 25b0149393..1ee1925a25 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, grpc_core::channelz::ChannelNode* channel_node = grpc_channel_get_channelz_node(lb_channel_); if (channel_node != nullptr) { - child_channels->push_back(channel_node->channel_uuid()); + child_channels->push_back(channel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 602d6e92f9..ed8cc60ea1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -71,11 +71,12 @@ class PickFirst : public LoadBalancingPolicy { : public SubchannelData<PickFirstSubchannelList, PickFirstSubchannelData> { public: - PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, - grpc_subchannel* subchannel, - grpc_combiner* combiner) + PickFirstSubchannelData( + SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* + subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 4195c1e9d1..8dd5820bae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -89,11 +89,12 @@ class RoundRobin : public LoadBalancingPolicy { : public SubchannelData<RoundRobinSubchannelList, RoundRobinSubchannelData> { public: - RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, - grpc_subchannel* subchannel, - grpc_combiner* combiner) + RoundRobinSubchannelData( + SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* + subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, combiner), user_data_vtable_(user_data_vtable), diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 91ddaec8b8..5e8682e056 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -65,6 +65,10 @@ class MySubchannelList namespace grpc_core { +// Forward declaration. +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelList; + // Stores data for a particular subchannel in a subchannel list. // Callers must create a subclass that implements the // ProcessConnectivityChangeLocked() method. @@ -72,7 +76,9 @@ template <typename SubchannelListType, typename SubchannelDataType> class SubchannelData { public: // Returns a pointer to the subchannel list containing this object. - SubchannelListType* subchannel_list() const { return subchannel_list_; } + SubchannelListType* subchannel_list() const { + return static_cast<SubchannelListType*>(subchannel_list_); + } // Returns the index into the subchannel list of this object. size_t Index() const { @@ -133,10 +139,11 @@ class SubchannelData { GRPC_ABSTRACT_BASE_CLASS protected: - SubchannelData(SubchannelListType* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, - grpc_combiner* combiner); + SubchannelData( + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner); virtual ~SubchannelData(); @@ -161,7 +168,7 @@ class SubchannelData { static void OnConnectivityChangedLocked(void* arg, grpc_error* error); // Backpointer to owning subchannel list. Not owned. - SubchannelListType* subchannel_list_; + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_; // The subchannel and connected subchannel. grpc_subchannel* subchannel_; @@ -200,7 +207,7 @@ class SubchannelList grpc_core::channelz::SubchannelNode* subchannel_node = grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); if (subchannel_node != nullptr) { - refs_list->push_back(subchannel_node->subchannel_uuid()); + refs_list->push_back(subchannel_node->uuid()); } } } @@ -268,7 +275,7 @@ class SubchannelList template <typename SubchannelListType, typename SubchannelDataType> SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( - SubchannelListType* subchannel_list, + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, const grpc_lb_user_data_vtable* user_data_vtable, const grpc_lb_address& address, grpc_subchannel* subchannel, grpc_combiner* combiner) @@ -532,8 +539,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(static_cast<SubchannelListType*>(this), - addresses->user_data_vtable, + subchannels_.emplace_back(this, addresses->user_data_vtable, addresses->addresses[i], subchannel, combiner); } } diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index b3900114ad..b94429e207 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -125,9 +125,16 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host))); if (host_end != nullptr) { GPR_ASSERT(host_end >= host); - char host_without_scope[GRPC_INET6_ADDRSTRLEN]; + char host_without_scope[GRPC_INET6_ADDRSTRLEN + 1]; size_t host_without_scope_len = static_cast<size_t>(host_end - host); uint32_t sin6_scope_id = 0; + if (host_without_scope_len > GRPC_INET6_ADDRSTRLEN) { + gpr_log(GPR_ERROR, + "invalid ipv6 address length %zu. Length cannot be greater than " + "GRPC_INET6_ADDRSTRLEN i.e %d)", + host_without_scope_len, GRPC_INET6_ADDRSTRLEN); + goto done; + } strncpy(host_without_scope, host, host_without_scope_len); host_without_scope[host_without_scope_len] = '\0'; if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) == @@ -190,3 +197,12 @@ bool grpc_parse_uri(const grpc_uri* uri, grpc_resolved_address* resolved_addr) { gpr_log(GPR_ERROR, "Can't parse scheme '%s'", uri->scheme); return false; } + +uint16_t grpc_strhtons(const char* port) { + if (strcmp(port, "http") == 0) { + return htons(80); + } else if (strcmp(port, "https") == 0) { + return htons(443); + } + return htons(static_cast<unsigned short>(atoi(port))); +} diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h index 9a88b66edc..c2af0e6c49 100644 --- a/src/core/ext/filters/client_channel/parse_address.h +++ b/src/core/ext/filters/client_channel/parse_address.h @@ -47,4 +47,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, bool log_errors); +/* Converts named or numeric port to a uint16 suitable for use in a sockaddr. */ +uint16_t grpc_strhtons(const char* port); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 485998f5e4..4c795c34c8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -87,15 +87,6 @@ typedef struct grpc_ares_hostbyname_request { static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } -static uint16_t strhtons(const char* port) { - if (strcmp(port, "http") == 0) { - return htons(80); - } else if (strcmp(port, "https") == 0) { - return htons(443); - } - return htons(static_cast<unsigned short>(atoi(port))); -} - static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, const char* input_output_str) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) { @@ -139,12 +130,6 @@ void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { } } -/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */ -void grpc_cares_wrapper_test_only_address_sorting_sort( - grpc_lb_addresses* lb_addrs) { - grpc_cares_wrapper_address_sorting_sort(lb_addrs); -} - static void grpc_ares_request_ref_locked(grpc_ares_request* r) { r->pending_queries++; } @@ -371,7 +356,8 @@ done: grpc_ares_request_unref_locked(r); } -static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( +static grpc_ares_request* +grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, @@ -454,12 +440,12 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( } r->pending_queries = 1; if (grpc_ares_query_ipv6()) { - hr = create_hostbyname_request_locked(r, host, strhtons(port), + hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port), false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, hr); } - hr = create_hostbyname_request_locked(r, host, strhtons(port), + hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port), false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked, hr); @@ -494,6 +480,79 @@ error_cleanup: return nullptr; } +static bool inner_resolve_as_ip_literal_locked(const char* name, + const char* default_port, + grpc_lb_addresses** addrs, + char** host, char** port, + char** hostport) { + gpr_split_host_port(name, host, port); + if (*host == nullptr) { + gpr_log(GPR_ERROR, + "Failed to parse %s to host:port while attempting to resolve as ip " + "literal.", + name); + return false; + } + if (*port == nullptr) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s while attempting to resolve as " + "ip literal.", + name); + return false; + } + *port = gpr_strdup(default_port); + } + grpc_resolved_address addr; + GPR_ASSERT(gpr_join_host_port(hostport, *host, atoi(*port))); + if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) || + grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) { + GPR_ASSERT(*addrs == nullptr); + *addrs = grpc_lb_addresses_create(1, nullptr); + grpc_lb_addresses_set_address( + *addrs, 0, addr.addr, addr.len, false /* is_balancer */, + nullptr /* balancer_name */, nullptr /* user_data */); + return true; + } + return false; +} + +static bool resolve_as_ip_literal_locked(const char* name, + const char* default_port, + grpc_lb_addresses** addrs) { + char* host = nullptr; + char* port = nullptr; + char* hostport = nullptr; + bool out = inner_resolve_as_ip_literal_locked(name, default_port, addrs, + &host, &port, &hostport); + gpr_free(host); + gpr_free(port); + gpr_free(hostport); + return out; +} + +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { + // Early out if the target is an ipv4 or ipv6 literal. + if (resolve_as_ip_literal_locked(name, default_port, addrs)) { + GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); + return nullptr; + } + // Early out if the target is localhost and we're on Windows. + if (grpc_ares_maybe_resolve_localhost_manually_locked(name, default_port, + addrs)) { + GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); + return nullptr; + } + // Look up name using c-ares lib. + return grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( + dns_server, name, default_port, interested_parties, on_done, addrs, + check_grpclb, service_config_json, combiner); +} + grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, @@ -502,7 +561,9 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( void grpc_cancel_ares_request(grpc_ares_request* r) { if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) { - grpc_ares_ev_driver_shutdown_locked(r->ev_driver); + if (r != nullptr) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); + } } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index ca5779e1d7..1bc457d4cf 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -81,9 +81,15 @@ void grpc_ares_complete_request_locked(grpc_ares_request* request); /* E.g., return false if ipv6 is known to not be available. */ bool grpc_ares_query_ipv6(); -/* Exposed only for testing */ -void grpc_cares_wrapper_test_only_address_sorting_sort( - grpc_lb_addresses* lb_addrs); +/* Maybe (depending on the current platform) checks if "name" matches + * "localhost" and if so fills in addrs with the correct sockaddr structures. + * Returns a bool indicating whether or not such an action was performed. + * See https://github.com/grpc/grpc/issues/15158. */ +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs); + +/* Sorts destinations in lb_addrs according to RFC 6724. */ +void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index 23c0fec74f..639eec2323 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -26,4 +26,9 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs) { + return false; +} + #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index ee827e284e..7e34784691 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -21,9 +21,79 @@ #include "src/core/lib/iomgr/port.h" #if GRPC_ARES == 1 && defined(GPR_WINDOWS) +#include <grpc/support/string_util.h> + +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/socket_windows.h" bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +static bool inner_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs, + char** host, char** port) { + gpr_split_host_port(name, host, port); + if (*host == nullptr) { + gpr_log(GPR_ERROR, + "Failed to parse %s into host:port during Windows localhost " + "resolution check.", + name); + return false; + } + if (*port == nullptr) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s during Windows localhost " + "resolution check.", + name); + return false; + } + *port = gpr_strdup(default_port); + } + if (gpr_stricmp(*host, "localhost") == 0) { + GPR_ASSERT(*addrs == nullptr); + *addrs = grpc_lb_addresses_create(2, nullptr); + uint16_t numeric_port = grpc_strhtons(*port); + // Append the ipv6 loopback address. + struct sockaddr_in6 ipv6_loopback_addr; + memset(&ipv6_loopback_addr, 0, sizeof(ipv6_loopback_addr)); + ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; + ipv6_loopback_addr.sin6_family = AF_INET6; + ipv6_loopback_addr.sin6_port = numeric_port; + grpc_lb_addresses_set_address( + *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + false /* is_balancer */, nullptr /* balancer_name */, + nullptr /* user_data */); + // Append the ipv4 loopback address. + struct sockaddr_in ipv4_loopback_addr; + memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); + ((char*)&ipv4_loopback_addr.sin_addr)[0] = 0x7f; + ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; + ipv4_loopback_addr.sin_family = AF_INET; + ipv4_loopback_addr.sin_port = numeric_port; + grpc_lb_addresses_set_address( + *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + false /* is_balancer */, nullptr /* balancer_name */, + nullptr /* user_data */); + // Let the address sorter figure out which one should be tried first. + grpc_cares_wrapper_address_sorting_sort(*addrs); + return true; + } + return false; +} + +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs) { + char* host = nullptr; + char* port = nullptr; + bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port, + addrs, &host, &port); + gpr_free(host); + gpr_free(port); + return out; +} + #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0e40f42e18..57d0b3759f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); - c->channelz_subchannel.reset(); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); + } gpr_free((void*)c->filters); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + arg = grpc_channel_args_find(c->args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + const grpc_integer_options options = {0, 0, INT_MAX}; + size_t channel_tracer_max_nodes = + (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = - grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( + c, channel_tracer_max_nodes); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); @@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel.get())); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } } +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(channelz_subchannel) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index a135035d62..84febb5204 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel); ~ConnectedSubchannel(); grpc_channel_stack* channel_stack() { return channel_stack_; } @@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + channelz::SubchannelNode* channelz_subchannel() { + return channelz_subchannel_; + } private: grpc_channel_stack* channel_stack_; + // backpointer to the channelz node in this connected subchannel's + // owning subchannel. + channelz::SubchannelNode* channelz_subchannel_; }; } // namespace grpc_core @@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr); +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); + /// Returns the URI string for the address to connect to. const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index cb02b1a748..f2b6c24e8e 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -42,7 +42,7 @@ struct grpc_subchannel_key { grpc_subchannel_args args; }; -static bool g_force_creation = false; +static gpr_atm g_force_creation = false; static grpc_subchannel_key* create_key( const grpc_subchannel_args* args, @@ -73,7 +73,8 @@ static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) { int grpc_subchannel_key_compare(const grpc_subchannel_key* a, const grpc_subchannel_key* b) { - if (g_force_creation) return false; + // To pretend the keys are different, return a non-zero value. + if (GPR_UNLIKELY(gpr_atm_no_barrier_load(&g_force_creation))) return 1; int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { @@ -250,5 +251,5 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { - g_force_creation = force_creation; + gpr_atm_no_barrier_store(&g_force_creation, force_creation); } diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index a7dae9d47d..c135613d26 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -65,13 +65,10 @@ void grpc_subchannel_index_ref(void); void grpc_subchannel_index_unref(void); /** \em TEST ONLY. - * If \a force_creation is true, all key comparisons will be false, resulting in + * If \a force_creation is true, all keys are regarded different, resulting in * new subchannels always being created. Otherwise, the keys will be compared as * usual. * - * This function is *not* threadsafe on purpose: it should *only* be used in - * test code. - * * Tests using this function \em MUST run tests with and without \a * force_creation set. */ void grpc_subchannel_index_test_only_set_force_creation(bool force_creation); diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb..91fa163fec 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,6 +51,7 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -78,7 +79,12 @@ struct channel_data { static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.status != nullptr) { - if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { + /* If both gRPC status and HTTP status are provided in the response, we + * should prefer the gRPC status code, as mentioned in + * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + */ + if (b->idx.named.grpc_status != nullptr || + grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { grpc_metadata_batch_remove(b, b->idx.named.status); } else { char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), @@ -147,6 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -162,6 +169,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } + error = grpc_error_add_child( + error, GRPC_ERROR_REF(calld->recv_initial_metadata_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 3919447f26..1b3426b120 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -23,6 +23,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <string.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" @@ -50,6 +51,7 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -60,6 +62,13 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; bool seen_recv_message_ready; + + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; +}; + +struct channel_data { + bool surface_user_agent; }; } // namespace @@ -258,6 +267,11 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem, GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority"))); } + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + if (!chand->surface_user_agent && b->idx.named.user_agent != nullptr) { + grpc_metadata_batch_remove(b, b->idx.named.user_agent); + } + return error; } @@ -267,6 +281,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -313,6 +328,15 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child( + GRPC_ERROR_REF(err), + GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -357,6 +381,13 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } + if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -389,6 +420,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -397,6 +431,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } @@ -405,7 +440,12 @@ static void hs_destroy_call_elem(grpc_call_element* elem, /* Constructor for channel_data */ static grpc_error* hs_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); GPR_ASSERT(!args->is_last); + chand->surface_user_agent = grpc_channel_arg_get_bool( + grpc_channel_args_find(args->channel_args, + const_cast<char*>(GRPC_ARG_SURFACE_USER_AGENT)), + true); return GRPC_ERROR_NONE; } @@ -419,7 +459,7 @@ const grpc_channel_filter grpc_http_server_filter = { hs_init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, hs_destroy_call_elem, - 0, + sizeof(channel_data), hs_init_channel_elem, hs_destroy_channel_elem, grpc_channel_next_get_info, diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 1fe8288bd0..431472609e 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -429,8 +429,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, ? GRPC_MILLIS_INF_FUTURE : DEFAULT_MAX_CONNECTION_IDLE_MS; chand->idle_state = MAX_IDLE_STATE_INIT; - gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, - GRPC_MILLIS_INF_PAST); + gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_MS)) { diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..c17df86f3d 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata_ready; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* original_recv_trailing_metadata_ready; }; struct channel_data { @@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -144,6 +150,17 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + error = + grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +189,13 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +207,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->original_recv_trailing_metadata_ready = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +242,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index dfed824cd5..5bdcb387c9 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client", false), args, "fd-client"); + grpc_fd_create(fd, "client", true), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index a0228785ee..e4bd91d07b 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, gpr_asprintf(&name, "fd:%d", fd); grpc_endpoint* server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_tcp_create(grpc_fd_create(fd, name, true), grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 36511fa608..26cad2cc9a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -230,35 +230,165 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_chttp2_transport* t, - const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client) { +/* Returns whether bdp is enabled */ +static bool read_channel_args(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + bool is_client) { + bool enable_bdp = true; size_t i; int j; - GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == - GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - - t->base.vtable = get_vtable(); - t->ep = ep; - /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(); - t->peer_string = grpc_endpoint_get_peer(ep); - t->endpoint_reading = 1; - t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->is_first_frame = true; - grpc_connectivity_state_init( - &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, - is_client ? "client_transport" : "server_transport"); - - grpc_slice_buffer_init(&t->qbuf); - - grpc_slice_buffer_init(&t->outbuf); - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + for (i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->next_stream_id = static_cast<uint32_t>(value); + } + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + grpc_chttp2_hpack_compressor_set_max_usable_size( + &t->hpack_compressor, static_cast<uint32_t>(value)); + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + {g_default_max_pings_without_data, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( + &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_sent_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_recv_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { + t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer( + &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); + t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = static_cast<uint32_t>( + grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } + } else { + static const struct { + const char* channel_arg_name; + grpc_chttp2_setting_id setting_id; + grpc_integer_options integer_options; + bool availability[2] /* server, client */; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + {1, 0, 1}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; + for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { + if (0 == strcmp(channel_args->args[i].key, + settings_map[j].channel_arg_name)) { + if (!settings_map[j].availability[is_client]) { + gpr_log(GPR_DEBUG, "%s is not available on %s", + settings_map[j].channel_arg_name, + is_client ? "clients" : "servers"); + } else { + int value = grpc_channel_arg_get_integer( + &channel_args->args[i], settings_map[j].integer_options); + if (value >= 0) { + queue_setting_update(t, settings_map[j].setting_id, + static_cast<uint32_t>(value)); + } + } + break; + } + } + } + } + return enable_bdp; +} +static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -286,6 +416,79 @@ static void init_transport(grpc_chttp2_transport* t, GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); +} + +static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { + if (t->is_client) { + t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_time_ms; + t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; + } else { + t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_time_ms; + t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; + } +} + +static void configure_transport_ping_policy(grpc_chttp2_transport* t) { + t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; + t->ping_policy.min_sent_ping_interval_without_data = + g_default_min_sent_ping_interval_without_data_ms; + t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; + t->ping_policy.min_recv_ping_interval_without_data = + g_default_min_recv_ping_interval_without_data_ms; +} + +static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); + } else { + /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no + inflight keeaplive timers */ + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; + } +} + +static void init_transport(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client) { + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == + GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); + + t->base.vtable = get_vtable(); + t->ep = ep; + /* one ref is for destroy */ + gpr_ref_init(&t->refs, 1); + t->combiner = grpc_combiner_create(); + t->peer_string = grpc_endpoint_get_peer(ep); + t->endpoint_reading = 1; + t->next_stream_id = is_client ? 1 : 2; + t->is_client = is_client; + t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; + t->is_first_frame = true; + grpc_connectivity_state_init( + &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); + + grpc_slice_buffer_init(&t->qbuf); + grpc_slice_buffer_init(&t->outbuf); + grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + + init_transport_closures(t); t->goaway_error = GRPC_ERROR_NONE; grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -301,6 +504,8 @@ static void init_transport(grpc_chttp2_transport* t, grpc_chttp2_stream_map_init(&t->stream_map, 8); /* copy in initial settings to all setting sets */ + size_t i; + int j; for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; @@ -328,191 +533,14 @@ static void init_transport(grpc_chttp2_transport* t, queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); - t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; - t->ping_policy.min_sent_ping_interval_without_data = - g_default_min_sent_ping_interval_without_data_ms; - t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; - t->ping_policy.min_recv_ping_interval_without_data = - g_default_min_recv_ping_interval_without_data_ms; - - /* Keepalive setting */ - if (t->is_client) { - t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_time_ms; - t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_client_keepalive_permit_without_calls; - } else { - t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_time_ms; - t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_server_keepalive_permit_without_calls; - } + configure_transport_ping_policy(t); + init_transport_keepalive_settings(t); t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; bool enable_bdp = true; - if (channel_args) { - for (i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - if ((t->next_stream_id & 1) != (value & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->next_stream_id & 1, is_client ? "client" : "server"); - } else { - t->next_stream_id = static_cast<uint32_t>(value); - } - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->hpack_compressor, static_cast<uint32_t>(value)); - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { - t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_max_pings_without_data, 0, INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { - t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( - &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_sent_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_sent_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_recv_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_recv_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { - t->write_buffer_size = - static_cast<uint32_t>(grpc_channel_arg_get_integer( - &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIME_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_time_ms - : g_default_server_keepalive_time_ms, - 1, INT_MAX}); - t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_timeout_ms - : g_default_server_keepalive_timeout_ms, - 0, INT_MAX}); - t->keepalive_timeout = - value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - t->keepalive_permit_without_calls = static_cast<uint32_t>( - grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_OPTIMIZATION_TARGET)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s should be a string", - GRPC_ARG_OPTIMIZATION_TARGET); - } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == - strcmp(channel_args->args[i].value.string, "throughput")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; - } else { - gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", - GRPC_ARG_OPTIMIZATION_TARGET, - channel_args->args[i].value.string); - } - } else { - static const struct { - const char* channel_arg_name; - grpc_chttp2_setting_id setting_id; - grpc_integer_options integer_options; - bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT32_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, - GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - {1, 0, 1}, - {true, true}}, - {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - {-1, 5, INT32_MAX}, - {true, true}}}; - for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { - if (0 == strcmp(channel_args->args[i].key, - settings_map[j].channel_arg_name)) { - if (!settings_map[j].availability[is_client]) { - gpr_log(GPR_DEBUG, "%s is not available on %s", - settings_map[j].channel_arg_name, - is_client ? "clients" : "servers"); - } else { - int value = grpc_channel_arg_get_integer( - &channel_args->args[i], settings_map[j].integer_options); - if (value >= 0) { - queue_setting_update(t, settings_map[j].setting_id, - static_cast<uint32_t>(value)); - } - } - break; - } - } - } - } + enable_bdp = read_channel_args(t, channel_args, is_client); } if (g_flow_control_enabled) { @@ -531,23 +559,11 @@ static void init_transport(grpc_chttp2_transport* t, t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t->ping_recv_state.ping_strikes = 0; - /* Start keepalive pings */ - if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); - } else { - /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no - inflight keeaplive timers */ - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; - } + init_keepalive_pings_if_enabled(t); if (enable_bdp) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); schedule_bdp_ping_locked(t); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, nullptr); } @@ -1029,7 +1045,8 @@ static void write_action(void* gt, grpc_error* error) { grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner))); + grpc_combiner_scheduler(t->combiner)), + nullptr); } /* Callback from the grpc_endpoint after bytes have been written by calling @@ -2886,17 +2903,20 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } } +void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = grpc_stream_compression_context_create( + stream_->stream_decompression_method); + } +} + grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!stream_->stream_decompression_ctx) { - stream_->stream_decompression_ctx = - grpc_stream_compression_context_create( - stream_->stream_decompression_method); - } + MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, &stream_->unprocessed_incoming_frames_buffer, &stream_->decompressed_data_buffer, nullptr, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ca6e715978..6b5309bab4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -246,6 +246,8 @@ class Chttp2IncomingByteStream : public ByteStream { static void NextLocked(void* arg, grpc_error* error_ignored); static void OrphanLocked(void* arg, grpc_error* error_ignored); + void MaybeCreateStreamDecompressionCtx(); + grpc_chttp2_transport* transport_; // Immutable. grpc_chttp2_stream* stream_; // Immutable. diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 4a252d972d..81e2634e3a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1287,7 +1287,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_error* error = GRPC_ERROR_NONE; if (stream_state->state_op_done[OP_CANCEL_ERROR]) { error = GRPC_ERROR_REF(stream_state->cancel_error); - } else if (stream_state->state_op_done[OP_FAILED]) { + } else if (stream_state->state_callback_received[OP_FAILED]) { error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); } else if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index b3443310ac..cfb2faba51 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -41,16 +41,14 @@ namespace grpc_core { namespace channelz { -ChannelTrace::TraceEvent::TraceEvent( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type) +ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_entity) : severity_(severity), data_(data), timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME)), next_(nullptr), - referenced_channel_(std::move(referenced_channel)), - referenced_type_(type) {} + referenced_entity_(std::move(referenced_entity)) {} ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) : severity_(severity), @@ -110,23 +108,13 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) { AddTraceEventHelper(New<TraceEvent>(severity, data)); } -void ChannelTrace::AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel) { - if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 - // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>( - severity, data, std::move(referenced_channel), ReferencedType::Channel)); -} - -void ChannelTrace::AddTraceEventReferencingSubchannel( +void ChannelTrace::AddTraceEventWithReference( Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel) { + RefCountedPtr<BaseNode> referenced_entity) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>(severity, data, - std::move(referenced_subchannel), - ReferencedType::Subchannel)); + AddTraceEventHelper( + New<TraceEvent>(severity, data, std::move(referenced_entity))); } namespace { @@ -157,19 +145,18 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { json_iterator = grpc_json_create_child(json_iterator, json, "timestamp", gpr_format_timespec(timestamp_), GRPC_JSON_STRING, true); - if (referenced_channel_ != nullptr) { + if (referenced_entity_ != nullptr) { + const bool is_channel = + (referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel || + referenced_entity_->type() == BaseNode::EntityType::kInternalChannel); char* uuid_str; - gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid()); + gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_entity_->uuid()); grpc_json* child_ref = grpc_json_create_child( - json_iterator, json, - (referenced_type_ == ReferencedType::Channel) ? "channelRef" - : "subchannelRef", + json_iterator, json, is_channel ? "channelRef" : "subchannelRef", nullptr, GRPC_JSON_OBJECT, false); json_iterator = grpc_json_create_child( - nullptr, child_ref, - (referenced_type_ == ReferencedType::Channel) ? "channelId" - : "subchannelId", - uuid_str, GRPC_JSON_STRING, true); + nullptr, child_ref, is_channel ? "channelId" : "subchannelId", uuid_str, + GRPC_JSON_STRING, true); json_iterator = child_ref; } } @@ -178,24 +165,26 @@ grpc_json* ChannelTrace::RenderJson() const { if (!max_list_size_) return nullptr; // tracing is disabled if max_events == 0 grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); - char* num_events_logged_str; - gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_); grpc_json* json_iterator = nullptr; - json_iterator = - grpc_json_create_child(json_iterator, json, "numEventsLogged", - num_events_logged_str, GRPC_JSON_STRING, true); + if (num_events_logged_ > 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "numEventsLogged", num_events_logged_); + } json_iterator = grpc_json_create_child( json_iterator, json, "creationTimestamp", gpr_format_timespec(time_created_), GRPC_JSON_STRING, true); - grpc_json* events = grpc_json_create_child(json_iterator, json, "events", - nullptr, GRPC_JSON_ARRAY, false); - json_iterator = nullptr; - TraceEvent* it = head_trace_; - while (it != nullptr) { - json_iterator = grpc_json_create_child(json_iterator, events, nullptr, - nullptr, GRPC_JSON_OBJECT, false); - it->RenderTraceEvent(json_iterator); - it = it->next(); + // only add in the event list if it is non-empty. + if (num_events_logged_ > 0) { + grpc_json* events = grpc_json_create_child(json_iterator, json, "events", + nullptr, GRPC_JSON_ARRAY, false); + json_iterator = nullptr; + TraceEvent* it = head_trace_; + while (it != nullptr) { + json_iterator = grpc_json_create_child(json_iterator, events, nullptr, + nullptr, GRPC_JSON_OBJECT, false); + it->RenderTraceEvent(json_iterator); + it = it->next(); + } } return json; } diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h index 596af7402f..94fea20b45 100644 --- a/src/core/lib/channel/channel_trace.h +++ b/src/core/lib/channel/channel_trace.h @@ -30,7 +30,7 @@ namespace grpc_core { namespace channelz { -class ChannelNode; +class BaseNode; // Object used to hold live data for a channel. This data is exposed via the // channelz service: @@ -55,35 +55,28 @@ class ChannelTrace { void AddTraceEvent(Severity severity, grpc_slice data); // Adds a new trace event to the tracing object. This trace event refers to a - // an event on a child of the channel. For example, if this channel has - // created a new subchannel, then it would record that with a TraceEvent - // referencing the new subchannel. + // an event that concerns a different channelz entity. For example, if this + // channel has created a new subchannel, then it would record that with + // a TraceEvent referencing the new subchannel. // // TODO(ncteisen): as this call is used more and more throughout the gRPC // stack, determine if it makes more sense to accept a char* instead of a // slice. - void AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel); - void AddTraceEventReferencingSubchannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel); + void AddTraceEventWithReference(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_entity); // Creates and returns the raw grpc_json object, so a parent channelz // object may incorporate the json before rendering. grpc_json* RenderJson() const; private: - // Types of objects that can be references by trace events. - enum class ReferencedType { Channel, Subchannel }; // Private class to encapsulate all the data and bookkeeping needed for a // a trace event. class TraceEvent { public: - // Constructor for a TraceEvent that references a different channel. + // Constructor for a TraceEvent that references a channel. TraceEvent(Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, - ReferencedType type); + RefCountedPtr<BaseNode> referenced_entity_); // Constructor for a TraceEvent that does not reverence a different // channel. @@ -105,10 +98,7 @@ class ChannelTrace { gpr_timespec timestamp_; TraceEvent* next_; // the tracer object for the (sub)channel that this trace event refers to. - RefCountedPtr<ChannelNode> referenced_channel_; - // the type that the referenced tracer points to. Unused if this trace - // does not point to any channel or subchannel - ReferencedType referenced_type_; + RefCountedPtr<BaseNode> referenced_entity_; }; // TraceEvent // Internal helper to add and link in a trace event diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 9d6002ed8a..375cf25cc6 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -41,33 +41,62 @@ namespace grpc_core { namespace channelz { -ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel) - : channel_(channel), - target_(nullptr), - channel_uuid_(-1), - is_top_level_channel_(is_top_level_channel) { - trace_.Init(channel_tracer_max_nodes); - target_ = UniquePtr<char>(grpc_channel_get_target(channel_)); - channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this); +BaseNode::BaseNode(EntityType type) + : type_(type), uuid_(ChannelzRegistry::Register(this)) {} + +BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); } + +char* BaseNode::RenderJsonString() { + grpc_json* json = RenderJson(); + GPR_ASSERT(json != nullptr); + char* json_str = grpc_json_dump_to_string(json, 0); + grpc_json_destroy(json); + return json_str; +} + +CallCountingHelper::CallCountingHelper() { gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -ChannelNode::~ChannelNode() { - trace_.Destroy(); - ChannelzRegistry::UnregisterChannelNode(channel_uuid_); -} +CallCountingHelper::~CallCountingHelper() {} -void ChannelNode::RecordCallStarted() { +void CallCountingHelper::RecordCallStarted() { gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -void ChannelNode::PopulateConnectivityState(grpc_json* json) {} +void CallCountingHelper::PopulateCallCounts(grpc_json* json) { + grpc_json* json_iterator = nullptr; + if (calls_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsStarted", calls_started_); + } + if (calls_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsSucceeded", calls_succeeded_); + } + if (calls_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsFailed", calls_failed_); + } + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); +} + +ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) + : BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel + : EntityType::kInternalChannel), + channel_(channel), + target_(UniquePtr<char>(grpc_channel_get_target(channel_))), + trace_(channel_tracer_max_nodes) {} -void ChannelNode::PopulateChildRefs(grpc_json* json) {} +ChannelNode::~ChannelNode() {} grpc_json* ChannelNode::RenderJson() { // We need to track these three json objects to build our object @@ -80,7 +109,7 @@ grpc_json* ChannelNode::RenderJson() { json = json_iterator; json_iterator = nullptr; json_iterator = grpc_json_add_number_string_child(json, json_iterator, - "channelId", channel_uuid_); + "channelId", uuid()); // reset json iterators to top level object json = top_level_json; json_iterator = nullptr; @@ -89,51 +118,28 @@ grpc_json* ChannelNode::RenderJson() { GRPC_JSON_OBJECT, false); json = data; json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateConnectivityState(json); + // populate the target. GPR_ASSERT(target_.get() != nullptr); - json_iterator = grpc_json_create_child( - json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); // fill in the channel trace if applicable - grpc_json* trace = trace_->RenderJson(); - if (trace != nullptr) { - // we manually link up and fill the child since it was created for us in - // ChannelTrace::RenderJson - trace->key = "trace"; // this object is named trace in channelz.proto - json_iterator = grpc_json_link_child(json, trace, json_iterator); - } - // reset the parent to be the data object. - json = data; - json_iterator = nullptr; - if (calls_started_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsStarted", calls_started_); + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); } - if (calls_succeeded_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsSucceeded", calls_succeeded_); - } - if (calls_failed_) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsFailed", calls_failed_); - } - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); - json_iterator = - grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", - gpr_format_timespec(ts), GRPC_JSON_STRING, true); + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); json = top_level_json; - json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateChildRefs(json); return top_level_json; } -char* ChannelNode::RenderJsonString() { - grpc_json* json = RenderJson(); - char* json_str = grpc_json_dump_to_string(json, 0); - grpc_json_destroy(json); - return json_str; -} - RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel) { @@ -141,12 +147,41 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( channel, channel_tracer_max_nodes, is_top_level_channel); } -SubchannelNode::SubchannelNode() { - subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this); -} +ServerNode::ServerNode(size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {} + +ServerNode::~ServerNode() {} -SubchannelNode::~SubchannelNode() { - ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_); +grpc_json* ServerNode::RenderJson() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "serverId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + json = top_level_json; + return top_level_json; } } // namespace channelz diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 07eb73d626..9be256147b 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -43,14 +43,52 @@ namespace grpc_core { namespace channelz { namespace testing { +class CallCountingHelperPeer; class ChannelNodePeer; -} +} // namespace testing -class ChannelNode : public RefCounted<ChannelNode> { +// base class for all channelz entities +class BaseNode : public RefCounted<BaseNode> { public: - static RefCountedPtr<ChannelNode> MakeChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); + // There are only four high level channelz entities. However, to support + // GetTopChannelsRequest, we split the Channel entity into two different + // types. All children of BaseNode must be one of these types. + enum class EntityType { + kTopLevelChannel, + kInternalChannel, + kSubchannel, + kServer, + kSocket, + }; + + explicit BaseNode(EntityType type); + virtual ~BaseNode(); + + // All children must implement this function. + virtual grpc_json* RenderJson() GRPC_ABSTRACT; + + // Renders the json and returns allocated string that must be freed by the + // caller. + char* RenderJsonString(); + + EntityType type() const { return type_; } + intptr_t uuid() const { return uuid_; } + + private: + const EntityType type_; + const intptr_t uuid_; +}; + +// This class is a helper class for channelz entities that deal with Channels, +// Subchannels, and Servers, since those have similar proto definitions. +// This class has the ability to: +// - track calls_{started,succeeded,failed} +// - track last_call_started_timestamp +// - perform rendering of the above items +class CallCountingHelper { + public: + CallCountingHelper(); + ~CallCountingHelper(); void RecordCallStarted(); void RecordCallFailed() { @@ -60,17 +98,46 @@ class ChannelNode : public RefCounted<ChannelNode> { gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); } - grpc_json* RenderJson(); - char* RenderJsonString(); + // Common rendering of the call count data and last_call_started_timestamp. + void PopulateCallCounts(grpc_json* json); - // helper for getting and populating connectivity state. It is virtual - // because it allows the client_channel specific code to live in ext/ - // instead of lib/ - virtual void PopulateConnectivityState(grpc_json* json); + private: + // testing peer friend. + friend class testing::CallCountingHelperPeer; - virtual void PopulateChildRefs(grpc_json* json); + gpr_atm calls_started_ = 0; + gpr_atm calls_succeeded_ = 0; + gpr_atm calls_failed_ = 0; + gpr_atm last_call_started_millis_ = 0; +}; - ChannelTrace* trace() { return trace_.get(); } +// Handles channelz bookkeeping for channels +class ChannelNode : public BaseNode { + public: + static RefCountedPtr<ChannelNode> MakeChannelNode( + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + + ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + ~ChannelNode() override; + + grpc_json* RenderJson() override; + + // template methods. RenderJSON uses these methods to render its JSON + // representation. These are virtual so that children classes may provide + // their specific mechanism for populating these parts of the channelz + // object. + // + // ChannelNode does not have a notion of connectivity state or child refs, + // so it leaves these implementations blank. + // + // This is utilizing the template method design pattern. + // + // TODO(ncteisen): remove these template methods in favor of manual traversal + // and mutation of the grpc_json object. + virtual void PopulateConnectivityState(grpc_json* json) {} + virtual void PopulateChildRefs(grpc_json* json) {} void MarkChannelDestroyed() { GPR_ASSERT(channel_ != nullptr); @@ -79,47 +146,62 @@ class ChannelNode : public RefCounted<ChannelNode> { bool ChannelIsDestroyed() { return channel_ == nullptr; } - intptr_t channel_uuid() { return channel_uuid_; } - bool is_top_level_channel() { return is_top_level_channel_; } - - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ChannelNode(); + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - // testing peer friend. + // to allow the channel trace test to access trace_. friend class testing::ChannelNodePeer; - grpc_channel* channel_ = nullptr; UniquePtr<char> target_; - gpr_atm calls_started_ = 0; - gpr_atm calls_succeeded_ = 0; - gpr_atm calls_failed_ = 0; - gpr_atm last_call_started_millis_ = 0; - intptr_t channel_uuid_; - bool is_top_level_channel_ = true; - ManualConstructor<ChannelTrace> trace_; + CallCountingHelper call_counter_; + ChannelTrace trace_; }; -// Placeholds channelz class for subchannels. All this can do now is track its -// uuid (this information is needed by the parent channelz class). -// TODO(ncteisen): build this out to support the GetSubchannel channelz request. -class SubchannelNode : public RefCounted<SubchannelNode> { +// Handles channelz bookkeeping for servers +class ServerNode : public BaseNode { public: - SubchannelNode(); - virtual ~SubchannelNode(); + explicit ServerNode(size_t channel_tracer_max_nodes); + ~ServerNode() override; - intptr_t subchannel_uuid() { return subchannel_uuid_; } + grpc_json* RenderJson() override; - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - intptr_t subchannel_uuid_; + CallCountingHelper call_counter_; + ChannelTrace trace_; +}; + +// Handles channelz bookkeeping for sockets +// TODO(ncteisen): implement in subsequent PR. +class SocketNode : public BaseNode { + public: + SocketNode() : BaseNode(EntityType::kSocket) {} + ~SocketNode() override {} }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index f79d2f0c17..adc7b6ba44 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -53,54 +53,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); } ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); } -intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) { +intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) { MutexLock lock(&mu_); - entities_.push_back(entry); + entities_.push_back(node); intptr_t uuid = entities_.size(); return uuid; } -void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) { +void ChannelzRegistry::InternalUnregister(intptr_t uuid) { GPR_ASSERT(uuid >= 1); MutexLock lock(&mu_); GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size()); - GPR_ASSERT(entities_[uuid - 1].type == type); - entities_[uuid - 1].object = nullptr; - entities_[uuid - 1].type = EntityType::kUnset; + entities_[uuid - 1] = nullptr; } -void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) { +BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) { MutexLock lock(&mu_); if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) { return nullptr; } - if (entities_[uuid - 1].type == type) { - return entities_[uuid - 1].object; - } else { - return nullptr; - } + return entities_[uuid - 1]; } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; - InlinedVector<ChannelNode*, 10> top_level_channels; + InlinedVector<BaseNode*, 10> top_level_channels; // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is // reserved). However, we want to support requests coming in with // start_channel_id=0, which signifies "give me everything." Hence this // funky looking line below. size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; for (size_t i = start_idx; i < entities_.size(); ++i) { - if (entities_[i].type == EntityType::kChannelNode) { - ChannelNode* channel_node = - static_cast<ChannelNode*>(entities_[i].object); - if (channel_node->is_top_level_channel()) { - top_level_channels.push_back(channel_node); - } + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) { + top_level_channels.push_back(entities_[i]); } } - if (top_level_channels.size() > 0) { + if (!top_level_channels.empty()) { // create list of channels grpc_json* array_parent = grpc_json_create_child( nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false); @@ -120,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { return json_str; } +char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + InlinedVector<BaseNode*, 10> servers; + // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is + // reserved). However, we want to support requests coming in with + // start_server_id=0, which signifies "give me everything." + size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1; + for (size_t i = start_idx; i < entities_.size(); ++i) { + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kServer) { + servers.push_back(entities_[i]); + } + } + if (!servers.empty()) { + // create list of servers + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false); + for (size_t i = 0; i < servers.size(); ++i) { + grpc_json* server_json = servers[i]->RenderJson(); + json_iterator = + grpc_json_link_child(array_parent, server_json, json_iterator); + } + } + // For now we do not have any pagination rules. In the future we could + // pick a constant for max_channels_sent for a GetServers request. + // Tracking: https://github.com/grpc/grpc/issues/16019. + json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, + GRPC_JSON_TRUE, false); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} + } // namespace channelz } // namespace grpc_core @@ -128,10 +156,18 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) { start_channel_id); } +char* grpc_channelz_get_servers(intptr_t start_server_id) { + return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id); +} + char* grpc_channelz_get_channel(intptr_t channel_id) { - grpc_core::channelz::ChannelNode* channel_node = - grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id); - if (channel_node == nullptr) { + grpc_core::channelz::BaseNode* channel_node = + grpc_core::channelz::ChannelzRegistry::Get(channel_id); + if (channel_node == nullptr || + (channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel && + channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) { return nullptr; } grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); @@ -143,3 +179,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) { grpc_json_destroy(top_level_json); return json_str; } + +char* grpc_channelz_get_subchannel(intptr_t subchannel_id) { + grpc_core::channelz::BaseNode* subchannel_node = + grpc_core::channelz::ChannelzRegistry::Get(subchannel_id); + if (subchannel_node == nullptr || + subchannel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kSubchannel) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* subchannel_json = subchannel_node->RenderJson(); + subchannel_json->key = "subchannel"; + grpc_json_link_child(json, subchannel_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index 5d7c936726..d0d660600d 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -40,32 +40,11 @@ class ChannelzRegistry { // To be called in grpc_shutdown(); static void Shutdown(); - // Register/Unregister/Get for ChannelNode - static intptr_t RegisterChannelNode(ChannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kChannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterChannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode); - } - static ChannelNode* GetChannelNode(intptr_t uuid) { - void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode); - return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten); - } - - // Register/Unregister/Get for SubchannelNode - static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kSubchannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterSubchannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode); - } - static SubchannelNode* GetSubchannelNode(intptr_t uuid) { - void* gotten = - Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode); - return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten); + static intptr_t Register(BaseNode* node) { + return Default()->InternalRegister(node); } + static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); } + static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); } // Returns the allocated JSON string that represents the proto // GetTopChannelsResponse as per channelz.proto. @@ -73,20 +52,13 @@ class ChannelzRegistry { return Default()->InternalGetTopChannels(start_channel_id); } - private: - enum class EntityType { - kChannelNode, - kSubchannelNode, - kUnset, - }; - - struct RegistryEntry { - RegistryEntry(void* object_in, EntityType type_in) - : object(object_in), type(type_in) {} - void* object; - EntityType type; - }; + // Returns the allocated JSON string that represents the proto + // GetServersResponse as per channelz.proto. + static char* GetServers(intptr_t start_server_id) { + return Default()->InternalGetServers(start_server_id); + } + private: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE @@ -97,21 +69,22 @@ class ChannelzRegistry { static ChannelzRegistry* Default(); // globally registers an Entry. Returns its unique uuid - intptr_t InternalRegisterEntry(const RegistryEntry& entry); + intptr_t InternalRegister(BaseNode* node); // globally unregisters the object that is associated to uuid. Also does // sanity check that an object doesn't try to unregister the wrong type. - void InternalUnregisterEntry(intptr_t uuid, EntityType type); + void InternalUnregister(intptr_t uuid); // if object with uuid has previously been registered as the correct type, // returns the void* associated with that uuid. Else returns nullptr. - void* InternalGetEntry(intptr_t uuid, EntityType type); + BaseNode* InternalGet(intptr_t uuid); char* InternalGetTopChannels(intptr_t start_channel_id); + char* InternalGetServers(intptr_t start_server_id); // protects entities_ and uuid_ gpr_mu mu_; - InlinedVector<RegistryEntry, 20> entities_; + InlinedVector<BaseNode*, 20> entities_; }; } // namespace channelz diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 12060074c5..3bd7a2ce59 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) { static void start_write(internal_request* req) { grpc_slice_ref_internal(req->request_text); grpc_slice_buffer_add(&req->outgoing, req->request_text); - grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write); + grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr); } static void on_handshake_done(void* arg, grpc_endpoint* ep) { diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc new file mode 100644 index 0000000000..6ada23db1c --- /dev/null +++ b/src/core/lib/iomgr/buffer_list.cc @@ -0,0 +1,134 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/buffer_list.h" +#include "src/core/lib/iomgr/port.h" + +#include <grpc/support/log.h> + +#ifdef GRPC_LINUX_ERRQUEUE +#include <time.h> + +#include "src/core/lib/gprpp/memory.h" + +namespace grpc_core { +void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, + void* arg) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg); + /* Store the current time as the sendmsg time. */ + new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); + if (*head == nullptr) { + *head = new_elem; + return; + } + /* Append at the end. */ + TracedBuffer* ptr = *head; + while (ptr->next_ != nullptr) { + ptr = ptr->next_; + } + ptr->next_ = new_elem; +} + +namespace { +/** Fills gpr_timespec gts based on values from timespec ts */ +void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { + gts->tv_sec = ts->tv_sec; + gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec); + gts->clock_type = GPR_CLOCK_REALTIME; +} + +/** The saved callback function that will be invoked when we get all the + * timestamps that we are going to get for a TracedBuffer. */ +void (*timestamps_callback)(void*, grpc_core::Timestamps*, + grpc_error* shutdown_err); +} /* namespace */ + +void TracedBuffer::ProcessTimestamp(TracedBuffer** head, + struct sock_extended_err* serr, + struct scm_timestamping* tss) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* elem = *head; + TracedBuffer* next = nullptr; + while (elem != nullptr) { + /* The byte number refers to the sequence number of the last byte which this + * timestamp relates to. */ + if (serr->ee_data >= elem->seq_no_) { + switch (serr->ee_info) { + case SCM_TSTAMP_SCHED: + fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0])); + elem = elem->next_; + break; + case SCM_TSTAMP_SND: + fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0])); + elem = elem->next_; + break; + case SCM_TSTAMP_ACK: + fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0])); + /* Got all timestamps. Do the callback and free this TracedBuffer. + * The thing below can be passed by value if we don't want the + * restriction on the lifetime. */ + timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE); + next = elem->next_; + Delete<TracedBuffer>(elem); + *head = elem = next; + break; + default: + abort(); + } + } else { + break; + } + } +} + +void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* elem = *head; + while (elem != nullptr) { + if (timestamps_callback) { + timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); + } + auto* next = elem->next_; + Delete<TracedBuffer>(elem); + elem = next; + } + *head = nullptr; + GRPC_ERROR_UNREF(shutdown_err); +} + +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)) { + timestamps_callback = fn; +} +} /* namespace grpc_core */ + +#else /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_core { +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)) { + gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform"); +} +} /* namespace grpc_core */ + +#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h new file mode 100644 index 0000000000..cbbf50a657 --- /dev/null +++ b/src/core/lib/iomgr/buffer_list.h @@ -0,0 +1,96 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H +#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#include <grpc/support/time.h> + +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/internal_errqueue.h" + +namespace grpc_core { +struct Timestamps { + /* TODO(yashykt): This would also need to store OPTSTAT once support is added + */ + gpr_timespec sendmsg_time; + gpr_timespec scheduled_time; + gpr_timespec sent_time; + gpr_timespec acked_time; +}; + +/** TracedBuffer is a class to keep track of timestamps for a specific buffer in + * the TCP layer. We are only tracking timestamps for Linux kernels and hence + * this class would only be used by Linux platforms. For all other platforms, + * TracedBuffer would be an empty class. + * + * The timestamps collected are according to grpc_core::Timestamps declared + * above. + * + * A TracedBuffer list is kept track of using the head element of the list. If + * the head element of the list is nullptr, then the list is empty. + */ +#ifdef GRPC_LINUX_ERRQUEUE +class TracedBuffer { + public: + /** Add a new entry in the TracedBuffer list pointed to by head. Also saves + * sendmsg_time with the current timestamp. */ + static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no, + void* arg); + + /** Processes a received timestamp based on sock_extended_err and + * scm_timestamping structures. It will invoke the timestamps callback if the + * timestamp type is SCM_TSTAMP_ACK. */ + static void ProcessTimestamp(grpc_core::TracedBuffer** head, + struct sock_extended_err* serr, + struct scm_timestamping* tss); + + /** Cleans the list by calling the callback for each traced buffer in the list + * with timestamps that it has. */ + static void Shutdown(grpc_core::TracedBuffer** head, + grpc_error* shutdown_err); + + private: + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW + + TracedBuffer(int seq_no, void* arg) + : seq_no_(seq_no), arg_(arg), next_(nullptr) {} + + uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ + void* arg_; /* The arg to pass to timestamps_callback */ + grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */ + grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */ +}; +#else /* GRPC_LINUX_ERRQUEUE */ +class TracedBuffer {}; +#endif /* GRPC_LINUX_ERRQUEUE */ + +/** Sets the callback function to call when timestamps for a write are + * collected. The callback does not own a reference to error. */ +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)); + +}; /* namespace grpc_core */ + +#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */ diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 92e7930111..44fb47e19d 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - ep->vtable->write(ep, slices, cb); + grpc_closure* cb, void* arg) { + ep->vtable->write(ep, slices, cb, arg); } void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 15db1649fa..1f590a80ca 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -33,10 +33,12 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; +class Timestamps; struct grpc_endpoint_vtable { void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); - void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); + void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, + void* arg); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); @@ -70,9 +72,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep); \a slices may be mutated at will by the endpoint until cb is called. No guarantee is made to the content of slices after a write EXCEPT that it is a valid slice buffer. + \a arg is platform specific. It is currently only used by TCP on linux + platforms as an argument that would be forwarded to the timestamps callback. */ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb); + grpc_closure* cb, void* arg); /* Causes any pending and future read/write callbacks to run immediately with success==0 */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index c3bc0cc8fd..df2cf508c8 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 5c5c246f99..3afbfd7254 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 90ed34da11..13bc69ffb6 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -513,9 +513,24 @@ bool grpc_error_get_str(grpc_error* err, grpc_error_strs which, grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) { GPR_TIMER_SCOPE("grpc_error_add_child", 0); - grpc_error* new_err = copy_error_and_unref(src); - internal_add_error(&new_err, child); - return new_err; + if (src != GRPC_ERROR_NONE) { + if (child == GRPC_ERROR_NONE) { + /* \a child is empty. Simply return the ref to \a src */ + return src; + } else if (child != src) { + grpc_error* new_err = copy_error_and_unref(src); + internal_add_error(&new_err, child); + return new_err; + } else { + /* \a src and \a child are the same. Drop one of the references and return + * the other */ + GRPC_ERROR_UNREF(child); + return src; + } + } else { + /* \a src is empty. Simply return the ref to \a child */ + return child; + } } static const char* no_error_string = "\"No Error\""; diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 27c4d22fd1..49f4029bc2 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -185,8 +185,16 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. The src error takes ownership of the /// child error. +/// +/// Edge Conditions - +/// 1) If either of \a src or \a child is GRPC_ERROR_NONE, returns a reference +/// to the other argument. 2) If both \a src and \a child are GRPC_ERROR_NONE, +/// returns GRPC_ERROR_NONE. 3) If \a src and \a child point to the same error, +/// returns a single reference. (Note that, 2 references should have been +/// received to the error in this case.) grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; + grpc_error* grpc_os_error(const char* file, int line, int err, const char* call_name) GRPC_MUST_USE_RESULT; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 0205363d5c..d4377e2d50 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -237,14 +237,19 @@ void grpc_event_engine_shutdown(void) { } bool grpc_event_engine_can_track_errors(void) { +/* Only track errors if platform supports errqueue. */ +#ifdef GRPC_LINUX_ERRQUEUE return g_event_engine->can_track_err; +#else + return false; +#endif /* GRPC_LINUX_ERRQUEUE */ } grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); - GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); - return g_event_engine->fd_create(fd, name, track_err); + return g_event_engine->fd_create(fd, name, + track_err && g_event_engine->can_track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc new file mode 100644 index 0000000000..99c22e9055 --- /dev/null +++ b/src/core/lib/iomgr/internal_errqueue.cc @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#include "src/core/lib/iomgr/internal_errqueue.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +bool kernel_supports_errqueue() { +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) + return true; +#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */ +#endif /* LINUX_VERSION_CODE */ + return false; +} + +#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h new file mode 100644 index 0000000000..9d122808f9 --- /dev/null +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -0,0 +1,83 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* This file contains constants defined in <linux/errqueue.h> and + * <linux/net_tstamp.h> so as to allow collecting network timestamps in the + * kernel. This file allows tcp_posix.cc to compile on platforms that do not + * have <linux/errqueue.h> and <linux/net_tstamp.h>. + */ + +#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H +#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include <sys/types.h> +#include <time.h> + +#ifdef GRPC_LINUX_ERRQUEUE +#include <linux/errqueue.h> +#include <linux/net_tstamp.h> +#include <sys/socket.h> +#endif /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_core { + +#ifdef GRPC_LINUX_ERRQUEUE + +/* Redefining scm_timestamping in the same way that <linux/errqueue.h> defines + * it, so that code compiles on systems that don't have it. */ +struct scm_timestamping { + struct timespec ts[3]; +}; +/* Also redefine timestamp types */ +/* The timestamp type for when the driver passed skb to NIC, or HW. */ +constexpr int SCM_TSTAMP_SND = 0; +/* The timestamp type for when data entered the packet scheduler. */ +constexpr int SCM_TSTAMP_SCHED = 1; +/* The timestamp type for when data acknowledged by peer. */ +constexpr int SCM_TSTAMP_ACK = 2; +/* Redefine required constants from <linux/net_tstamp.h> */ +constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1; +constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4; +constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7; +constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8; +constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9; +constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11; + +constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | + SOF_TIMESTAMPING_OPT_ID | + SOF_TIMESTAMPING_OPT_TSONLY; +constexpr uint32_t kTimestampingRecordingOptions = + SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | + SOF_TIMESTAMPING_TX_ACK; +#endif /* GRPC_LINUX_ERRQUEUE */ + +/* Returns true if kernel is capable of supporting errqueue and timestamping. + * Currently allowing only linux kernels above 4.0.0 + */ +bool kernel_supports_errqueue(); +} // namespace grpc_core + +#endif /* GRPC_POSIX_SOCKET_TCP */ + +#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 066417b93c..abf96662f5 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,6 +60,11 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) +#define GRPC_LINUX_ERRQUEUE 1 +#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */ +#endif /* LINUX_VERSION_CODE */ #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 296ee74311..9c989b7dfe 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name, false); + *fdobj = grpc_fd_create(fd, name, true); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index 990e8d632b..e02a1898f2 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket, } static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep; GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b53ffbf01c..ac1e919acb 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -27,7 +27,9 @@ #include <errno.h> #include <limits.h> +#include <netinet/in.h> #include <stdbool.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> @@ -46,6 +48,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" @@ -97,17 +100,42 @@ struct grpc_tcp { grpc_closure read_done_closure; grpc_closure write_done_closure; + grpc_closure error_closure; char* peer_string; grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; + + grpc_core::TracedBuffer* tb_head; /* List of traced buffers */ + gpr_mu tb_mu; /* Lock for access to list of traced buffers */ + + /* grpc_endpoint_write takes an argument which if non-null means that the + * transport layer wants the TCP layer to collect timestamps for this write. + * This arg is forwarded to the timestamps callback function when the ACK + * timestamp is received from the kernel. This arg is a (void *) which allows + * users of this API to pass in a pointer to any kind of structure. This + * structure could actually be a tag or any book-keeping object that the user + * can use to distinguish between different traced writes. The only + * requirement from the TCP endpoint layer is that this arg should be non-null + * if the user wants timestamps for the write. */ + void* outgoing_buffer_arg; + /* A counter which starts at 0. It is initialized the first time the socket + * options for collecting timestamps are set, and is incremented with each + * byte sent. */ + int bytes_counter; + bool socket_ts_enabled; /* True if timestamping options are set on the socket + */ + gpr_atm + stop_error_notification; /* Set to 1 if we do not want to be notified on + errors anymore */ }; struct backup_poller { gpr_mu* pollset_mu; grpc_closure run_poller; }; + } // namespace #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) @@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) { grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); + gpr_mu_destroy(&tcp->tb_mu); gpr_free(tcp); } @@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } @@ -513,6 +546,235 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, } } +/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number + * of bytes sent. */ +ssize_t tcp_send(int fd, const struct msghdr* msg) { + GPR_TIMER_SCOPE("sendmsg", 1); + ssize_t sent_length; + do { + /* TODO(klempner): Cork if this is a partial write */ + GRPC_STATS_INC_SYSCALL_WRITE(); + sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); + } while (sent_length < 0 && errno == EINTR); + return sent_length; +} + +/** This is to be called if outgoing_buffer_arg is not null. On linux platforms, + * this will call sendmsg with socket options set to collect timestamps inside + * the kernel. On return, sent_length is set to the return value of the sendmsg + * call. Returns false if setting the socket options failed. This is not + * implemented for non-linux platforms currently, and crashes out. + */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, grpc_error** error); + +/** The callback function to be invoked when we get an error on the socket. */ +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); + +#ifdef GRPC_LINUX_ERRQUEUE +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + if (!tcp->socket_ts_enabled) { + uint32_t opt = grpc_core::kTimestampingSocketOptions; + if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, + static_cast<void*>(&opt), sizeof(opt)) != 0) { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); + } + return false; + } + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = true; + } + /* Set control message to indicate that you want timestamps. */ + union { + char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))]; + struct cmsghdr align; + } u; + cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SO_TIMESTAMPING; + cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t)); + *reinterpret_cast<int*>(CMSG_DATA(cmsg)) = + grpc_core::kTimestampingRecordingOptions; + msg->msg_control = u.cmsg_buf; + msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); + + /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ + ssize_t length = tcp_send(tcp->fd, msg); + *sent_length = length; + /* Only save timestamps if all the bytes were taken by sendmsg. */ + if (sending_length == static_cast<size_t>(length)) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::AddNewEntry( + &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), + tcp->outgoing_buffer_arg); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } + return true; +} + +/** Reads \a cmsg to derive timestamps from the control messages. If a valid + * timestamp is found, the traced buffer list is updated with this timestamp. + * The caller of this function should be looping on the control messages found + * in \a msg. \a cmsg should point to the control message that the caller wants + * processed. + * On return, a pointer to a control message is returned. On the next iteration, + * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */ +struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, + struct cmsghdr* cmsg) { + auto next_cmsg = CMSG_NXTHDR(msg, cmsg); + if (next_cmsg == nullptr) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Received timestamp without extended error"); + } + return cmsg; + } + + if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || + !(next_cmsg->cmsg_type == IP_RECVERR || + next_cmsg->cmsg_type == IPV6_RECVERR)) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Unexpected control message"); + } + return cmsg; + } + + auto tss = + reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg)); + auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg)); + if (serr->ee_errno != ENOMSG || + serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { + gpr_log(GPR_ERROR, "Unexpected control message"); + return cmsg; + } + /* The error handling can potentially be done on another thread so we need + * to protect the traced buffer list. A lock free list might be better. Using + * a simple mutex for now. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss); + gpr_mu_unlock(&tcp->tb_mu); + return next_cmsg; +} + +/** For linux platforms, reads the socket's error queue and processes error + * messages from the queue. Returns true if all the errors processed were + * timestamps. Returns false if any of the errors were not timestamps. For + * non-linux platforms, error processing is not used/enabled currently. + */ +static bool process_errors(grpc_tcp* tcp) { + while (true) { + struct iovec iov; + iov.iov_base = nullptr; + iov.iov_len = 0; + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + + union { + char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/]; + struct cmsghdr align; + } aligned_buf; + memset(&aligned_buf, 0, sizeof(aligned_buf)); + + msg.msg_control = aligned_buf.rbuf; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + + int r, saved_errno; + do { + r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); + saved_errno = errno; + } while (r < 0 && saved_errno == EINTR); + + if (r == -1 && saved_errno == EAGAIN) { + return true; /* No more errors to process */ + } + if (r == -1) { + return false; + } + if (grpc_tcp_trace.enabled()) { + if ((msg.msg_flags & MSG_CTRUNC) == 1) { + gpr_log(GPR_INFO, "Error message was truncated."); + } + } + + if (msg.msg_controllen == 0) { + /* There was no control message found. It was probably spurious. */ + return true; + } + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || + cmsg->cmsg_type != SCM_TIMESTAMPING) { + /* Got a control message that is not a timestamp. Don't know how to + * handle this. */ + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, + "unknown control message cmsg_level:%d cmsg_type:%d", + cmsg->cmsg_level, cmsg->cmsg_type); + } + return false; + } + process_timestamp(tcp, &msg, cmsg); + } + } +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); + } + + if (error != GRPC_ERROR_NONE || + static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { + /* We aren't going to register to hear on error anymore, so it is safe to + * unref. */ + grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "error-tracking"); + return; + } + + /* We are still interested in collecting timestamps, so let's try reading + * them. */ + if (!process_errors(tcp)) { + /* This was not a timestamps error. This was an actual error. Set the + * read and write closures to be ready. + */ + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); + } + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); +} + +#else /* GRPC_LINUX_ERRQUEUE */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); + GPR_ASSERT(0); + return false; +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + gpr_log(GPR_ERROR, "Error handling is not supported for this platform"); + GPR_ASSERT(0); +} +#endif /* GRPC_LINUX_ERRQUEUE */ + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -557,19 +819,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = iov_size; - msg.msg_control = nullptr; - msg.msg_controllen = 0; msg.msg_flags = 0; + if (tcp->outgoing_buffer_arg != nullptr) { + if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + error)) + return true; /* something went wrong with timestamps */ + } else { + msg.msg_control = nullptr; + msg.msg_controllen = 0; - GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); - GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - GPR_TIMER_SCOPE("sendmsg", 1); - do { - /* TODO(klempner): Cork if this is a partial write */ - GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); - } while (sent_length < 0 && errno == EINTR); + sent_length = tcp_send(tcp->fd, &msg); + } if (sent_length < 0) { if (errno == EAGAIN) { @@ -593,6 +856,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } GPR_ASSERT(tcp->outgoing_byte_idx == 0); + tcp->bytes_counter += sent_length; trailing = sending_length - static_cast<size_t>(sent_length); while (trailing > 0) { size_t slice_length; @@ -607,7 +871,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { trailing -= slice_length; } } - if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); @@ -640,14 +903,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -675,6 +937,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; + tcp->outgoing_buffer_arg = arg; + if (arg) { + GPR_ASSERT(grpc_event_engine_can_track_errors()); + } if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); @@ -792,6 +1058,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = false; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -803,6 +1071,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); grpc_resource_quota_unref_internal(resource_quota); + gpr_mu_init(&tcp->tb_mu); + tcp->tb_head = nullptr; + /* Start being notified on errors if event engine can track errors. */ + if (grpc_event_engine_can_track_errors()) { + /* Grab a ref to tcp so that we can safely access the tcp struct when + * processing errors. We unref when we no longer want to track errors + * separately. */ + TCP_REF(tcp, "error-tracking"); + gpr_atm_rel_store(&tcp->stop_error_notification, 0); + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); + } return &tcp->base; } @@ -821,6 +1102,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + /* Stop errors notification. */ + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index af89bd24db..eff825cb92 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -31,7 +31,10 @@ #include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" + #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 8ddf684fea..824db07fbf 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name, false); + grpc_fd* fdobj = grpc_fd_create(fd, name, true); read_notifier_pollset = sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, false); + sp->emfd = grpc_fd_create(fd, name, true); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index b9f8145572..9595c028ce 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, false); + sp->emfd = grpc_fd_create(fd, name, true); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index b3cb442f18..64c4a56ae9 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) { /* Initiates a write. */ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_winsocket* socket = tcp->socket; grpc_winsocket_callback_info* info = &socket->write_info; diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 4294162af7..008d37119a 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -291,7 +291,7 @@ static void timer_list_init() { static void timer_list_shutdown() { size_t i; run_some_expired_timers( - GPR_ATM_MAX, nullptr, + GRPC_MILLIS_INF_FUTURE, nullptr, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < g_num_shards; i++) { timer_shard* shard = &g_shards[i]; @@ -714,9 +714,10 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { #if GPR_ARCH_64 gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64 - " glob_min=%" PRIdPTR, + " glob_min=%" PRId64, now, next_str, min_timer, - gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); + static_cast<grpc_millis>(gpr_atm_no_barrier_load( + (gpr_atm*)(&g_shared_mutables.min_timer)))); #else gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64, now, next_str, min_timer); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index bdb2d0e764..3dd7cab855 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name, false); + emfd_ = grpc_fd_create(fd, name, true); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index 04b4c87c71..6246613e7b 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -59,8 +59,8 @@ static const char* installed_roots_path = /** Environment variable used as a flag to enable/disable loading system root certificates from the OS trust store. */ -#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR -#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS" +#ifndef GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR +#define GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_NOT_USE_SYSTEM_SSL_ROOTS" #endif #ifndef TSI_OPENSSL_ALPN_SUPPORT @@ -1192,10 +1192,10 @@ const char* DefaultSslRootStore::GetPemRootCerts() { grpc_slice DefaultSslRootStore::ComputePemRootCerts() { grpc_slice result = grpc_empty_slice(); - char* use_system_roots_env_value = - gpr_getenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR); - const bool use_system_roots = gpr_is_true(use_system_roots_env_value); - gpr_free(use_system_roots_env_value); + char* not_use_system_roots_env_value = + gpr_getenv(GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR); + const bool not_use_system_roots = gpr_is_true(not_use_system_roots_env_value); + gpr_free(not_use_system_roots_env_value); // First try to load the roots from the environment. char* default_root_certs_path = gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); @@ -1218,7 +1218,7 @@ grpc_slice DefaultSslRootStore::ComputePemRootCerts() { gpr_free(pem_root_certs); } // Try loading roots from OS trust store if flag is enabled. - if (GRPC_SLICE_IS_EMPTY(result) && use_system_roots) { + if (GRPC_SLICE_IS_EMPTY(result) && !not_use_system_roots) { result = LoadSystemRootCerts(); } // Fallback to roots manually shipped with gRPC. diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index 840b2e73bc..f40f969bb7 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, } static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0); unsigned i; @@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, return; } - grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg); } static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) { diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index aff723ed04..d76d582638 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked( grpc_slice_buffer_reset_and_unref_internal(&h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); grpc_endpoint_write(h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); + &h->on_handshake_data_sent_to_peer, nullptr); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 19cbb03b63..552e70130a 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -41,6 +41,9 @@ struct call_data { grpc_transport_stream_op_batch* recv_initial_metadata_batch; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; + grpc_error* error; + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; grpc_metadata_array md; const grpc_metadata* consumed_md; size_t num_consumed_md; @@ -111,6 +114,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); } + calld->error = GRPC_ERROR_REF(error); GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); } @@ -184,6 +188,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { GRPC_ERROR_REF(error)); } +static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void auth_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = static_cast<call_data*>(elem->call_data); @@ -195,6 +206,12 @@ static void auth_start_transport_stream_op_batch( batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } + if (batch->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } grpc_call_next_op(elem, batch); } @@ -208,6 +225,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Create server security context. Set its auth context from channel // data and save it in the call context. grpc_server_security_context* server_ctx = @@ -227,7 +247,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->error); +} /* Constructor for channel_data */ static grpc_error* init_channel_elem(grpc_channel_element* elem, diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 2923a86646..11b438f5dc 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -48,6 +48,7 @@ #include "src/core/lib/surface/call_test_only.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" @@ -71,46 +72,6 @@ // Used to create arena for the first call. #define ESTIMATED_MDELEM_COUNT 16 -/* Status data for a request can come from several sources; this - enumerates them all, and acts as a priority sorting for which - status to return to the application - earlier entries override - later ones */ -typedef enum { - /* Status came from the application layer overriding whatever - the wire says */ - STATUS_FROM_API_OVERRIDE = 0, - /* Status came from 'the wire' - or somewhere below the surface - layer */ - STATUS_FROM_WIRE, - /* Status was created by some internal channel stack operation: must come via - add_batch_error */ - STATUS_FROM_CORE, - /* Status was created by some surface error */ - STATUS_FROM_SURFACE, - /* Status came from the server sending status */ - STATUS_FROM_SERVER_STATUS, - STATUS_SOURCE_COUNT -} status_source; - -typedef struct { - bool is_set; - grpc_error* error; -} received_status; - -static gpr_atm pack_received_status(received_status r) { - return r.is_set ? (1 | (gpr_atm)r.error) : 0; -} - -static received_status unpack_received_status(gpr_atm atm) { - if ((atm & 1) == 0) { - return {false, GRPC_ERROR_NONE}; - } else { - return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))}; - } -} - -#define MAX_ERRORS_PER_BATCH 4 - typedef struct batch_control { grpc_call* call; /* Share memory for cq_completion and notify_tag as they are never needed @@ -135,10 +96,7 @@ typedef struct batch_control { grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; - - grpc_error* errors[MAX_ERRORS_PER_BATCH]; - gpr_atm num_errors; - + gpr_atm batch_error; grpc_transport_stream_op_batch op; } batch_control; @@ -201,9 +159,6 @@ struct grpc_call { // A char* indicating the peer name. gpr_atm peer_string; - /* Packed received call statuses from various sources */ - gpr_atm status[STATUS_SOURCE_COUNT]; - /* Call data useful used for reporting. Only valid after the call has * completed */ grpc_call_final_info final_info; @@ -236,6 +191,7 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; + gpr_atm cancelled; grpc_closure release_call; @@ -247,8 +203,11 @@ struct grpc_call { } client; struct { int* cancelled; + // backpointer to owning server if this is a server side call. + grpc_server* server; } server; } final_op; + grpc_error* status_error; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -286,23 +245,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression"); static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op, grpc_closure* start_batch_closure); -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, + +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description); -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error); +static void cancel_with_error(grpc_call* c, grpc_error* error); static void destroy_call(void* call_stack, grpc_error* error); static void receiving_slice_ready(void* bctlp, grpc_error* error); -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string); -static void set_status_value_directly(grpc_status_code status, void* dest); -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error); +static void set_final_status(grpc_call* call, grpc_error* error); static void process_data_after_md(batch_control* bctl); static void post_batch_completion(batch_control* bctl); -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled); static void add_init_error(grpc_error** composite, grpc_error* new_err) { if (new_err == GRPC_ERROR_NONE) return; @@ -353,6 +304,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); + gpr_atm_no_barrier_store(&call->cancelled, 0); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; @@ -362,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE); call->is_client = args->server_transport_data == nullptr; - if (call->is_client) { - GRPC_STATS_INC_CLIENT_CALLS_CREATED(); - } else { - GRPC_STATS_INC_SERVER_CALLS_CREATED(); - } call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { + GRPC_STATS_INC_CLIENT_CALLS_CREATED(); GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { @@ -383,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, call->send_extra_metadata_count = static_cast<int>(args->add_initial_metadata_count); } else { + GRPC_STATS_INC_SERVER_CALLS_CREATED(); + call->final_op.server.server = args->server; GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } @@ -464,10 +414,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_mu_unlock(&pc->child_list_mu); } if (error != GRPC_ERROR_NONE) { - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + cancel_with_error(call, GRPC_ERROR_REF(error)); } if (immediately_cancel) { - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(call, GRPC_ERROR_CANCELLED); } if (args->cq != nullptr) { GPR_ASSERT(args->pollset_set_alternative == nullptr && @@ -486,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, &call->pollent); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - channelz_channel->RecordCallStarted(); + if (call->is_client) { + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + channelz_channel->RecordCallStarted(); + } + } else { + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { + channelz_server->RecordCallStarted(); + } } grpc_slice_unref_internal(path); @@ -561,16 +519,13 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - nullptr, &(c->final_info.error_string)); + grpc_error_get_status(c->status_error, c->send_deadline, + &c->final_info.final_status, nullptr, nullptr, + &(c->final_info.error_string)); + GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); - } - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); @@ -608,7 +563,7 @@ void grpc_call_unref(grpc_call* c) { bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { - cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(c, GRPC_ERROR_CANCELLED); } else { // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if @@ -626,8 +581,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); grpc_core::ExecCtx exec_ctx; - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); - + cancel_with_error(call, GRPC_ERROR_CANCELLED); return GRPC_CALL_OK; } @@ -681,8 +635,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == nullptr); - cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description); - + cancel_with_status(c, status, description); return GRPC_CALL_OK; } @@ -702,15 +655,17 @@ static void done_termination(void* arg, grpc_error* error) { gpr_free(state); } -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error) { +static void cancel_with_error(grpc_call* c, grpc_error* error) { + if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) { + GRPC_ERROR_UNREF(error); + return; + } GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error)); - set_status_from_error(c, source, GRPC_ERROR_REF(error)); cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state))); state->call = c; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, @@ -733,90 +688,45 @@ static grpc_error* error_from_status(grpc_status_code status, GRPC_ERROR_INT_GRPC_STATUS, status); } -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description) { - cancel_with_error(c, source, error_from_status(status, description)); -} - -/******************************************************************************* - * FINAL STATUS CODE MANIPULATION - */ - -static bool get_final_status_from( - grpc_call* call, grpc_error* error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - grpc_status_code code; - grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr, - error_string); - if (code == GRPC_STATUS_OK && !allow_ok_status) { - return false; - } - - set_value(code, set_value_user_data); - if (details != nullptr) { - *details = grpc_slice_ref_internal(slice); - } - return true; + cancel_with_error(c, error_from_status(status, description)); } -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - int i; - received_status status[STATUS_SOURCE_COUNT]; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); - } +static void set_final_status(grpc_call* call, grpc_error* error) { if (grpc_call_error_trace.enabled()) { - gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR"); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error)); - } - } + gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); + gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - /* first search through ignoring "OK" statuses: if something went wrong, - * ensure we report it */ - for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) { - /* search for the best status we can present: ideally the error we use has a - clearly defined grpc-status, and we'll prefer that. */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set && - grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } + if (call->is_client) { + grpc_error_get_status(error, call->send_deadline, + call->final_op.client.status, + call->final_op.client.status_details, nullptr, + call->final_op.client.error_string); + // explicitly take a ref + grpc_slice_ref_internal(*call->final_op.client.status_details); + call->status_error = error; + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); } } - /* If no clearly defined status exists, search for 'anything' */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } + } else { + *call->final_op.server.cancelled = + error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { + if (*call->final_op.server.cancelled) { + channelz_server->RecordCallFailed(); + } else { + channelz_server->RecordCallSucceeded(); } } - } - /* If nothing exists, set some default */ - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error) { - if (!gpr_atm_rel_cas(&call->status[source], - pack_received_status({false, GRPC_ERROR_NONE}), - pack_received_status({true, error}))) { GRPC_ERROR_UNREF(error); } } @@ -1035,6 +945,7 @@ static grpc_stream_compression_algorithm decode_stream_compression( static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b, int is_trailing) { if (b->list.count == 0) return; + if (!call->is_client && is_trailing) return; if (is_trailing && call->buffered_metadata[1] == nullptr) return; GPR_TIMER_SCOPE("publish_app_metadata", 0); grpc_metadata_array* dest; @@ -1088,9 +999,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) { publish_app_metadata(call, b, false); } -static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { +static void recv_trailing_filter(void* args, grpc_metadata_batch* b, + grpc_error* batch_error) { grpc_call* call = static_cast<grpc_call*>(args); - if (b->idx.named.grpc_status != nullptr) { + if (batch_error != GRPC_ERROR_NONE) { + set_final_status(call, batch_error); + } else if (b->idx.named.grpc_status != nullptr) { grpc_status_code status_code = grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); grpc_error* error = GRPC_ERROR_NONE; @@ -1108,8 +1022,18 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_empty_slice()); } - set_status_from_error(call, STATUS_FROM_WIRE, error); + set_final_status(call, GRPC_ERROR_REF(error)); grpc_metadata_batch_remove(b, b->idx.named.grpc_status); + GRPC_ERROR_UNREF(error); + } else if (!call->is_client) { + set_final_status(call, GRPC_ERROR_NONE); + } else { + gpr_log(GPR_DEBUG, + "Received trailing metadata with no error and no status"); + set_final_status( + call, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN)); } publish_app_metadata(call, b, true); } @@ -1124,14 +1048,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { * BATCH API IMPLEMENTATION */ -static void set_status_value_directly(grpc_status_code status, void* dest) { - *static_cast<grpc_status_code*>(dest) = status; -} - -static void set_cancelled_value(grpc_status_code status, void* dest) { - *static_cast<int*>(dest) = (status != GRPC_STATUS_OK); -} - static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = @@ -1199,31 +1115,18 @@ static void finish_batch_completion(void* user_data, GRPC_CALL_INTERNAL_UNREF(call, "completion"); } -static grpc_error* consolidate_batch_errors(batch_control* bctl) { - size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors)); - if (n == 0) { - return GRPC_ERROR_NONE; - } else if (n == 1) { - /* Skip creating a composite error in the case that only one error was - logged */ - grpc_error* e = bctl->errors[0]; - bctl->errors[0] = nullptr; - return e; - } else { - grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Call batch failed", bctl->errors, n); - for (size_t i = 0; i < n; i++) { - GRPC_ERROR_UNREF(bctl->errors[i]); - bctl->errors[i] = nullptr; - } - return error; - } +static void reset_batch_errors(batch_control* bctl) { + GRPC_ERROR_UNREF( + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error))); + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE)); } static void post_batch_completion(batch_control* bctl) { grpc_call* next_child_call; grpc_call* call = bctl->call; - grpc_error* error = consolidate_batch_errors(bctl); + grpc_error* error = GRPC_ERROR_REF( + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error))); if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( @@ -1249,8 +1152,7 @@ static void post_batch_completion(batch_control* bctl) { next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); - cancel_with_error(child, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + cancel_with_error(child, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel"); } child = next_child_call; @@ -1258,24 +1160,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { - get_final_status(call, set_status_value_directly, - call->final_op.client.status, - call->final_op.client.status_details, - call->final_op.client.error_string); - } else { - get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled, nullptr, nullptr); - } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - if (*call->final_op.client.status != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1284,9 +1168,10 @@ static void post_batch_completion(batch_control* bctl) { grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; } + reset_batch_errors(bctl); if (bctl->completion_data.notify_tag.is_closure) { - /* unrefs bctl->error */ + /* unrefs error */ bctl->call = nullptr; /* This closure may be meant to be run within some combiner. Since we aren't * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead @@ -1296,7 +1181,7 @@ static void post_batch_completion(batch_control* bctl) { error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { - /* unrefs bctl->error */ + /* unrefs error */ grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error, finish_batch_completion, bctl, &bctl->completion_data.cq_completion); @@ -1405,8 +1290,12 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) { grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { call->receiving_stream.reset(); - add_batch_error(bctl, GRPC_ERROR_REF(error), true); - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } /* If recv_state is RECV_NONE, we will save the batch_control * object with rel_cas, and will not use it after the cas. Its corresponding @@ -1442,8 +1331,7 @@ static void validate_filtered_metadata(batch_control* bctl) { call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else if ( grpc_compression_algorithm_from_message_stream_compression_algorithm( @@ -1455,8 +1343,7 @@ static void validate_filtered_metadata(batch_control* bctl) { "compression (%d).", call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else { char* error_msg = nullptr; @@ -1466,8 +1353,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, compression_algorithm) == 0) { /* check if algorithm is supported by current channel config */ @@ -1476,8 +1362,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } gpr_free(error_msg); @@ -1495,23 +1380,12 @@ static void validate_filtered_metadata(batch_control* bctl) { } } -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled) { - if (error == GRPC_ERROR_NONE) return; - int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1)); - if (idx == 0 && !has_cancelled) { - cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); - } - bctl->errors[idx] = error; -} - static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1524,6 +1398,13 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { call->send_deadline = md->deadline; } + } else { + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } grpc_closure* saved_rsr_closure = nullptr; @@ -1561,10 +1442,9 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); + recv_trailing_filter(call, md, GRPC_ERROR_REF(error)); finish_batch_step(bctl); } @@ -1572,7 +1452,14 @@ static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + if (error != GRPC_ERROR_NONE) { + cancel_with_error(call, GRPC_ERROR_REF(error)); + } finish_batch_step(bctl); } @@ -1774,28 +1661,32 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( call->channel, op->data.send_status_from_server.status); - { - grpc_error* override_error = GRPC_ERROR_NONE; - if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = - error_from_status(op->data.send_status_from_server.status, - "Returned non-ok status"); - } - if (op->data.send_status_from_server.status_details != nullptr) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; + grpc_error* status_error = + op->data.send_status_from_server.status == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server returned error"), + GRPC_ERROR_INT_GRPC_STATUS, + static_cast<intptr_t>( + op->data.send_status_from_server.status)); + if (op->data.send_status_from_server.status_details != nullptr) { + call->send_extra_metadata[1].md = grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_ref_internal( + *op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + if (status_error != GRPC_ERROR_NONE) { char* msg = grpc_slice_to_c_string( GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = - grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE, + status_error = + grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg)); gpr_free(msg); } - set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error); } + + call->status_error = status_error; if (!prepare_application_metadata( call, static_cast<int>( diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index b3b06059d4..b34260505a 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success, typedef struct grpc_call_create_args { grpc_channel* channel; + grpc_server* server; grpc_call* parent; uint32_t propagation_mask; diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 82635d3c21..054fe105c3 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -100,7 +100,6 @@ grpc_channel* grpc_channel_create_with_builder( return channel; } - memset(channel, 0, sizeof(*channel)); channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); size_t channel_tracer_max_nodes = 0; // default to off @@ -166,11 +165,12 @@ grpc_channel* grpc_channel_create_with_builder( } grpc_channel_args_destroy(args); - if (channelz_enabled) { - bool is_top_level_channel = channel->is_client && !internal_channel; + // we only need to do the channelz bookkeeping for clients here. The channelz + // bookkeeping for server channels occurs in src/core/lib/surface/server.cc + if (channelz_enabled && channel->is_client) { channel->channelz_channel = channel_node_create_func( - channel, channel_tracer_max_nodes, is_top_level_channel); - channel->channelz_channel->trace()->AddTraceEvent( + channel, channel_tracer_max_nodes, !internal_channel); + channel->channelz_channel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); } @@ -428,6 +428,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) { static void destroy_channel(void* arg, grpc_error* error) { grpc_channel* channel = static_cast<grpc_channel*>(arg); if (channel->channelz_channel != nullptr) { + channel->channelz_channel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel destroyed")); channel->channelz_channel->MarkChannelDestroyed(); channel->channelz_channel.reset(); } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0769d9e4f6..c2cf450e94 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); + } else { + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index cb34def740..5fa58ffdec 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -149,6 +149,9 @@ struct call_data { grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; + grpc_closure recv_trailing_metadata_ready; + grpc_error* error; + grpc_closure* original_recv_trailing_metadata_ready; grpc_closure publish; @@ -219,6 +222,8 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; + + grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -364,6 +369,7 @@ static void server_ref(grpc_server* server) { static void server_delete(grpc_server* server) { registered_method* rm; size_t i; + server->channelz_server.reset(); grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); @@ -730,6 +736,14 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); } +static void server_recv_trailing_metadata_ready(void* user_data, + grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void server_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast<call_data*>(elem->call_data); @@ -745,6 +759,12 @@ static void server_mutate_op(grpc_call_element* elem, op->payload->recv_initial_metadata.recv_flags = &calld->recv_initial_metadata_flags; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } } static void server_start_transport_stream_op_batch( @@ -779,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport, args.channel = chand->channel; args.server_transport_data = transport_server_data; args.send_deadline = GRPC_MILLIS_INF_FUTURE; + args.server = chand->server; grpc_call* call; grpc_error* error = grpc_call_create(&args, &call); grpc_call_element* elem = @@ -828,7 +849,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); - + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; } @@ -840,7 +863,7 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); GPR_ASSERT(calld->state != PENDING); - + GRPC_ERROR_UNREF(calld->error); if (calld->host_set) { grpc_slice_unref_internal(calld->host); } @@ -941,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server, } grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { + grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server* server = @@ -957,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { server->channel_args = grpc_channel_args_copy(args); + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); + if (grpc_channel_arg_get_bool(arg, false)) { + arg = grpc_channel_args_find(args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + size_t trace_events_per_node = + grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + server->channelz_server = + grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>( + trace_events_per_node); + server->channelz_server->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Server created")); + } + return server; } @@ -1459,3 +1497,11 @@ int grpc_server_has_open_connections(grpc_server* server) { gpr_mu_unlock(&server->mu_global); return r; } + +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server) { + if (server == nullptr) { + return nullptr; + } + return server->channelz_server.get(); +} diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index c617cc223e..0196743ff9 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -23,6 +23,7 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/transport.h" @@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args); +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server); + const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); int grpc_server_has_open_connections(grpc_server* server); diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index e92fe2c5a1..a44f9acdc3 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "6.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "glider"; } +const char* grpc_g_stands_for(void) { return "gao"; } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index b5268add0d..17e8026096 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -24,6 +24,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h" const int kHandshakerClientOpNum = 4; @@ -109,7 +110,7 @@ static grpc_byte_buffer* get_serialized_start_client(alts_tsi_event* event) { if (ok) { buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); gpr_free(target_name); grpc_gcp_handshaker_req_destroy(req); return buffer; @@ -157,7 +158,7 @@ static grpc_byte_buffer* get_serialized_start_server( if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -195,7 +196,7 @@ static grpc_byte_buffer* get_serialized_next(grpc_slice* bytes_received) { if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -258,7 +259,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); client->base.vtable = &vtable; - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); return &client->base; } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc index e0e4184686..d63d3538c5 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc @@ -20,6 +20,8 @@ #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h" +#include "src/core/lib/slice/slice_internal.h" + void add_repeated_field(repeated_field** head, const void* data) { repeated_field* field = static_cast<repeated_field*>(gpr_zalloc(sizeof(*field))); @@ -67,7 +69,7 @@ grpc_slice* create_slice(const char* data, size_t size) { void destroy_slice(grpc_slice* slice) { if (slice != nullptr) { - grpc_slice_unref(*slice); + grpc_slice_unref_internal(*slice); gpr_free(slice); } } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc index ec0bf12b95..cb36d5ebd1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc @@ -24,6 +24,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker, tsi_handshaker_on_next_done_cb cb, void* user_data, @@ -66,8 +68,8 @@ void alts_tsi_event_destroy(alts_tsi_event* event) { grpc_byte_buffer_destroy(event->recv_buffer); grpc_metadata_array_destroy(&event->initial_metadata); grpc_metadata_array_destroy(&event->trailing_metadata); - grpc_slice_unref(event->details); - grpc_slice_unref(event->target_name); + grpc_slice_unref_internal(event->details); + grpc_slice_unref_internal(event->target_name); grpc_alts_credentials_options_destroy(event->options); gpr_free(event); } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 1df1021bb1..34608a3de1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -31,6 +31,7 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/frame_protector/alts_frame_protector.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_client.h" #include "src/core/tsi/alts/handshaker/alts_tsi_utils.h" @@ -182,7 +183,7 @@ static void handshaker_result_destroy(tsi_handshaker_result* self) { gpr_free(result->peer_identity); gpr_free(result->key_data); gpr_free(result->unused_bytes); - grpc_slice_unref(result->rpc_versions); + grpc_slice_unref_internal(result->rpc_versions); gpr_free(result); } @@ -269,12 +270,12 @@ static tsi_result handshaker_next( handshaker->has_sent_start_message = true; } else { if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) { - grpc_slice_unref(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->recv_bytes); } handshaker->recv_bytes = grpc_slice_ref(slice); ok = alts_handshaker_client_next(handshaker->client, event, &slice); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); if (ok != TSI_OK) { gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests"); return ok; @@ -299,8 +300,8 @@ static void handshaker_destroy(tsi_handshaker* self) { alts_tsi_handshaker* handshaker = reinterpret_cast<alts_tsi_handshaker*>(self); alts_handshaker_client_destroy(handshaker->client); - grpc_slice_unref(handshaker->recv_bytes); - grpc_slice_unref(handshaker->target_name); + grpc_slice_unref_internal(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->target_name); grpc_alts_credentials_options_destroy(handshaker->options); gpr_free(handshaker->buffer); gpr_free(handshaker); diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc index d9b5e6c945..1747f1ad04 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc @@ -22,6 +22,8 @@ #include <grpc/byte_buffer_reader.h> +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) { switch (code) { case GRPC_STATUS_OK: @@ -47,7 +49,7 @@ grpc_gcp_handshaker_resp* alts_tsi_utils_deserialize_response( grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr); grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create(); bool ok = grpc_gcp_handshaker_resp_decode(slice, resp); - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); grpc_byte_buffer_reader_destroy(&bbr); if (!ok) { grpc_gcp_handshaker_resp_destroy(resp); diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc index d4fd88d1e2..e7890903d5 100644 --- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc +++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc @@ -61,7 +61,7 @@ static tsi_result alts_grpc_privacy_integrity_protect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to protect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(protected_slice); + grpc_slice_unref_internal(protected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_add(protected_slices, protected_slice); @@ -106,7 +106,7 @@ static tsi_result alts_grpc_privacy_integrity_unprotect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(unprotected_slice); + grpc_slice_unref_internal(unprotected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb); diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc index 2fd408103b..dac23bbf7a 100644 --- a/src/core/tsi/alts_transport_security.cc +++ b/src/core/tsi/alts_transport_security.cc @@ -45,7 +45,9 @@ void grpc_tsi_alts_signal_for_cq_destroy() { } void grpc_tsi_alts_init() { - memset(&g_alts_resource, 0, sizeof(alts_shared_resource)); + g_alts_resource.channel = nullptr; + g_alts_resource.cq = nullptr; + g_alts_resource.is_cq_drained = false; gpr_mu_init(&g_alts_resource.mu); gpr_cv_init(&g_alts_resource.cv); } diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index ce74fde343..f9184bcc34 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -19,6 +19,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" @@ -53,7 +54,7 @@ class SslSessionLRUCache::Node { SetSession(std::move(session)); } - ~Node() { grpc_slice_unref(key_); } + ~Node() { grpc_slice_unref_internal(key_); } // Not copyable nor movable. Node(const Node&) = delete; |