diff options
Diffstat (limited to 'src')
106 files changed, 2438 insertions, 1641 deletions
diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc index c7af9c38fa..e39d8be5d4 100644 --- a/src/compiler/ruby_generator.cc +++ b/src/compiler/ruby_generator.cc @@ -160,12 +160,20 @@ grpc::string GetServices(const FileDescriptor* file) { return output; } + std::string package_name; + + if (file->options().has_ruby_package()) { + package_name = file->options().ruby_package(); + } else { + package_name = file->package(); + } + // Write out a file header. std::map<grpc::string, grpc::string> header_comment_vars = ListToDict({ "file.name", file->name(), "file.package", - file->package(), + package_name, }); out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n"); out.Print(header_comment_vars, @@ -190,7 +198,7 @@ grpc::string GetServices(const FileDescriptor* file) { // Write out services within the modules out.Print("\n"); - std::vector<grpc::string> modules = Split(file->package(), '.'); + std::vector<grpc::string> modules = Split(package_name, '.'); for (size_t i = 0; i < modules.size(); ++i) { std::map<grpc::string, grpc::string> module_vars = ListToDict({ "module.name", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d2bf4f388d..388736b60a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -457,7 +457,6 @@ get_service_config_from_resolver_result_locked(channel_data* chand) { grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; service_config->ParseGlobalParams(parse_retry_throttle_params, @@ -934,6 +933,11 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata; + grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -995,6 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // // send op data caching @@ -1293,6 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { + maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -1778,23 +1785,22 @@ static void recv_message_ready(void* arg, grpc_error* error) { // recv_trailing_metadata handling // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, +// Sets *status and *server_pushback_md based on md_batch and error. +// Only sets *server_pushback_md if server_pushback_md != nullptr. +static void get_call_status(grpc_call_element* elem, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status, grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + if (server_pushback_md != nullptr && + md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } @@ -1967,8 +1973,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); @@ -2573,6 +2590,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { } // +// Channelz +// + +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata == nullptr); + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + +// // LB pick // @@ -2601,6 +2681,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + if (channelz_subchannel != nullptr) { + channelz_subchannel->RecordCallStarted(); + } if (parent_data_size > 0) { subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 86c765df52..7e8f59bcd3 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -20,10 +20,13 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include <grpc/support/string_util.h> + namespace grpc_core { namespace channelz { namespace { @@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } +SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, + size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kSubchannel), + subchannel_(subchannel), + target_( + UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), + trace_(channel_tracer_max_nodes) {} + +SubchannelNode::~SubchannelNode() {} + +void SubchannelNode::PopulateConnectivityState(grpc_json* json) { + grpc_connectivity_state state; + if (subchannel_ == nullptr) { + state = GRPC_CHANNEL_SHUTDOWN; + } else { + state = grpc_subchannel_check_connectivity(subchannel_, nullptr); + } + json = grpc_json_create_child(nullptr, json, "state", nullptr, + GRPC_JSON_OBJECT, false); + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, + false); +} + +grpc_json* SubchannelNode::RenderJson() { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "subchannelId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + PopulateConnectivityState(json); + GPR_ASSERT(target_.get() != nullptr); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 6f27b5c8b7..8ce331e529 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -23,9 +23,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/inlined_vector.h" +typedef struct grpc_subchannel grpc_subchannel; + namespace grpc_core { // TODO(ncteisen), this only contains the uuids of the children for now, @@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode { grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel); - // Override this functionality since client_channels have a notion of - // channel connectivity. - void PopulateConnectivityState(grpc_json* json) override; + ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + virtual ~ClientChannelNode() {} - // Override this functionality since client_channels have subchannels + // Overriding template methods from ChannelNode to render information that + // only ClientChannelNode knows about. + void PopulateConnectivityState(grpc_json* json) override; void PopulateChildRefs(grpc_json* json) override; // Helper to create a channel arg to ensure this type of ChannelNode is // created. static grpc_arg CreateChannelArg(); - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ClientChannelNode() {} - private: grpc_channel_element* client_channel_; }; +// Handles channelz bookkeeping for sockets +class SubchannelNode : public BaseNode { + public: + SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); + ~SubchannelNode() override; + + void MarkSubchannelDestroyed() { + GPR_ASSERT(subchannel_ != nullptr); + subchannel_ = nullptr; + } + + grpc_json* RenderJson() override; + + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } + + private: + grpc_subchannel* subchannel_; + UniquePtr<char> target_; + CallCountingHelper call_counter_; + ChannelTrace trace_; + + void PopulateConnectivityState(grpc_json* json); +}; + } // namespace channelz } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 25b0149393..1ee1925a25 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, grpc_core::channelz::ChannelNode* channel_node = grpc_channel_get_channelz_node(lb_channel_); if (channel_node != nullptr) { - child_channels->push_back(channel_node->channel_uuid()); + child_channels->push_back(channel_node->uuid()); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 602d6e92f9..ed8cc60ea1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -71,11 +71,12 @@ class PickFirst : public LoadBalancingPolicy { : public SubchannelData<PickFirstSubchannelList, PickFirstSubchannelData> { public: - PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, - grpc_subchannel* subchannel, - grpc_combiner* combiner) + PickFirstSubchannelData( + SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* + subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 4195c1e9d1..8dd5820bae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -89,11 +89,12 @@ class RoundRobin : public LoadBalancingPolicy { : public SubchannelData<RoundRobinSubchannelList, RoundRobinSubchannelData> { public: - RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, - grpc_subchannel* subchannel, - grpc_combiner* combiner) + RoundRobinSubchannelData( + SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* + subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, combiner), user_data_vtable_(user_data_vtable), diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 91ddaec8b8..5e8682e056 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -65,6 +65,10 @@ class MySubchannelList namespace grpc_core { +// Forward declaration. +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelList; + // Stores data for a particular subchannel in a subchannel list. // Callers must create a subclass that implements the // ProcessConnectivityChangeLocked() method. @@ -72,7 +76,9 @@ template <typename SubchannelListType, typename SubchannelDataType> class SubchannelData { public: // Returns a pointer to the subchannel list containing this object. - SubchannelListType* subchannel_list() const { return subchannel_list_; } + SubchannelListType* subchannel_list() const { + return static_cast<SubchannelListType*>(subchannel_list_); + } // Returns the index into the subchannel list of this object. size_t Index() const { @@ -133,10 +139,11 @@ class SubchannelData { GRPC_ABSTRACT_BASE_CLASS protected: - SubchannelData(SubchannelListType* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, - grpc_combiner* combiner); + SubchannelData( + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner); virtual ~SubchannelData(); @@ -161,7 +168,7 @@ class SubchannelData { static void OnConnectivityChangedLocked(void* arg, grpc_error* error); // Backpointer to owning subchannel list. Not owned. - SubchannelListType* subchannel_list_; + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_; // The subchannel and connected subchannel. grpc_subchannel* subchannel_; @@ -200,7 +207,7 @@ class SubchannelList grpc_core::channelz::SubchannelNode* subchannel_node = grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); if (subchannel_node != nullptr) { - refs_list->push_back(subchannel_node->subchannel_uuid()); + refs_list->push_back(subchannel_node->uuid()); } } } @@ -268,7 +275,7 @@ class SubchannelList template <typename SubchannelListType, typename SubchannelDataType> SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( - SubchannelListType* subchannel_list, + SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, const grpc_lb_user_data_vtable* user_data_vtable, const grpc_lb_address& address, grpc_subchannel* subchannel, grpc_combiner* combiner) @@ -532,8 +539,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(static_cast<SubchannelListType*>(this), - addresses->user_data_vtable, + subchannels_.emplace_back(this, addresses->user_data_vtable, addresses->addresses[i], subchannel, combiner); } } diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index b3900114ad..707beb8876 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -125,27 +125,41 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host))); if (host_end != nullptr) { GPR_ASSERT(host_end >= host); - char host_without_scope[GRPC_INET6_ADDRSTRLEN]; + char host_without_scope[GRPC_INET6_ADDRSTRLEN + 1]; size_t host_without_scope_len = static_cast<size_t>(host_end - host); uint32_t sin6_scope_id = 0; + if (host_without_scope_len > GRPC_INET6_ADDRSTRLEN) { + if (log_errors) { + gpr_log( + GPR_ERROR, + "invalid ipv6 address length %zu. Length cannot be greater than " + "GRPC_INET6_ADDRSTRLEN i.e %d)", + host_without_scope_len, GRPC_INET6_ADDRSTRLEN); + } + goto done; + } strncpy(host_without_scope, host, host_without_scope_len); host_without_scope[host_without_scope_len] = '\0'; if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); + if (log_errors) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); + } goto done; } if (gpr_parse_bytes_to_uint32(host_end + 1, strlen(host) - host_without_scope_len - 1, &sin6_scope_id) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); + if (log_errors) { + gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); + } goto done; } // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027. in6->sin6_scope_id = sin6_scope_id; } else { if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); goto done; } } @@ -190,3 +204,12 @@ bool grpc_parse_uri(const grpc_uri* uri, grpc_resolved_address* resolved_addr) { gpr_log(GPR_ERROR, "Can't parse scheme '%s'", uri->scheme); return false; } + +uint16_t grpc_strhtons(const char* port) { + if (strcmp(port, "http") == 0) { + return htons(80); + } else if (strcmp(port, "https") == 0) { + return htons(443); + } + return htons(static_cast<unsigned short>(atoi(port))); +} diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h index 9a88b66edc..c2af0e6c49 100644 --- a/src/core/ext/filters/client_channel/parse_address.h +++ b/src/core/ext/filters/client_channel/parse_address.h @@ -47,4 +47,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, bool log_errors); +/* Converts named or numeric port to a uint16 suitable for use in a sockaddr. */ +uint16_t grpc_strhtons(const char* port); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 485998f5e4..4c795c34c8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -87,15 +87,6 @@ typedef struct grpc_ares_hostbyname_request { static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } -static uint16_t strhtons(const char* port) { - if (strcmp(port, "http") == 0) { - return htons(80); - } else if (strcmp(port, "https") == 0) { - return htons(443); - } - return htons(static_cast<unsigned short>(atoi(port))); -} - static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, const char* input_output_str) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) { @@ -139,12 +130,6 @@ void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { } } -/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */ -void grpc_cares_wrapper_test_only_address_sorting_sort( - grpc_lb_addresses* lb_addrs) { - grpc_cares_wrapper_address_sorting_sort(lb_addrs); -} - static void grpc_ares_request_ref_locked(grpc_ares_request* r) { r->pending_queries++; } @@ -371,7 +356,8 @@ done: grpc_ares_request_unref_locked(r); } -static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( +static grpc_ares_request* +grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, @@ -454,12 +440,12 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( } r->pending_queries = 1; if (grpc_ares_query_ipv6()) { - hr = create_hostbyname_request_locked(r, host, strhtons(port), + hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port), false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, hr); } - hr = create_hostbyname_request_locked(r, host, strhtons(port), + hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port), false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked, hr); @@ -494,6 +480,79 @@ error_cleanup: return nullptr; } +static bool inner_resolve_as_ip_literal_locked(const char* name, + const char* default_port, + grpc_lb_addresses** addrs, + char** host, char** port, + char** hostport) { + gpr_split_host_port(name, host, port); + if (*host == nullptr) { + gpr_log(GPR_ERROR, + "Failed to parse %s to host:port while attempting to resolve as ip " + "literal.", + name); + return false; + } + if (*port == nullptr) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s while attempting to resolve as " + "ip literal.", + name); + return false; + } + *port = gpr_strdup(default_port); + } + grpc_resolved_address addr; + GPR_ASSERT(gpr_join_host_port(hostport, *host, atoi(*port))); + if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) || + grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) { + GPR_ASSERT(*addrs == nullptr); + *addrs = grpc_lb_addresses_create(1, nullptr); + grpc_lb_addresses_set_address( + *addrs, 0, addr.addr, addr.len, false /* is_balancer */, + nullptr /* balancer_name */, nullptr /* user_data */); + return true; + } + return false; +} + +static bool resolve_as_ip_literal_locked(const char* name, + const char* default_port, + grpc_lb_addresses** addrs) { + char* host = nullptr; + char* port = nullptr; + char* hostport = nullptr; + bool out = inner_resolve_as_ip_literal_locked(name, default_port, addrs, + &host, &port, &hostport); + gpr_free(host); + gpr_free(port); + gpr_free(hostport); + return out; +} + +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { + // Early out if the target is an ipv4 or ipv6 literal. + if (resolve_as_ip_literal_locked(name, default_port, addrs)) { + GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); + return nullptr; + } + // Early out if the target is localhost and we're on Windows. + if (grpc_ares_maybe_resolve_localhost_manually_locked(name, default_port, + addrs)) { + GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); + return nullptr; + } + // Look up name using c-ares lib. + return grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( + dns_server, name, default_port, interested_parties, on_done, addrs, + check_grpclb, service_config_json, combiner); +} + grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, @@ -502,7 +561,9 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( void grpc_cancel_ares_request(grpc_ares_request* r) { if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) { - grpc_ares_ev_driver_shutdown_locked(r->ev_driver); + if (r != nullptr) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); + } } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index ca5779e1d7..1bc457d4cf 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -81,9 +81,15 @@ void grpc_ares_complete_request_locked(grpc_ares_request* request); /* E.g., return false if ipv6 is known to not be available. */ bool grpc_ares_query_ipv6(); -/* Exposed only for testing */ -void grpc_cares_wrapper_test_only_address_sorting_sort( - grpc_lb_addresses* lb_addrs); +/* Maybe (depending on the current platform) checks if "name" matches + * "localhost" and if so fills in addrs with the correct sockaddr structures. + * Returns a bool indicating whether or not such an action was performed. + * See https://github.com/grpc/grpc/issues/15158. */ +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs); + +/* Sorts destinations in lb_addrs according to RFC 6724. */ +void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index 23c0fec74f..639eec2323 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -26,4 +26,9 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs) { + return false; +} + #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index ee827e284e..7e34784691 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -21,9 +21,79 @@ #include "src/core/lib/iomgr/port.h" #if GRPC_ARES == 1 && defined(GPR_WINDOWS) +#include <grpc/support/string_util.h> + +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/socket_windows.h" bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +static bool inner_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs, + char** host, char** port) { + gpr_split_host_port(name, host, port); + if (*host == nullptr) { + gpr_log(GPR_ERROR, + "Failed to parse %s into host:port during Windows localhost " + "resolution check.", + name); + return false; + } + if (*port == nullptr) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s during Windows localhost " + "resolution check.", + name); + return false; + } + *port = gpr_strdup(default_port); + } + if (gpr_stricmp(*host, "localhost") == 0) { + GPR_ASSERT(*addrs == nullptr); + *addrs = grpc_lb_addresses_create(2, nullptr); + uint16_t numeric_port = grpc_strhtons(*port); + // Append the ipv6 loopback address. + struct sockaddr_in6 ipv6_loopback_addr; + memset(&ipv6_loopback_addr, 0, sizeof(ipv6_loopback_addr)); + ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; + ipv6_loopback_addr.sin6_family = AF_INET6; + ipv6_loopback_addr.sin6_port = numeric_port; + grpc_lb_addresses_set_address( + *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + false /* is_balancer */, nullptr /* balancer_name */, + nullptr /* user_data */); + // Append the ipv4 loopback address. + struct sockaddr_in ipv4_loopback_addr; + memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); + ((char*)&ipv4_loopback_addr.sin_addr)[0] = 0x7f; + ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; + ipv4_loopback_addr.sin_family = AF_INET; + ipv4_loopback_addr.sin_port = numeric_port; + grpc_lb_addresses_set_address( + *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + false /* is_balancer */, nullptr /* balancer_name */, + nullptr /* user_data */); + // Let the address sorter figure out which one should be tried first. + grpc_cares_wrapper_address_sorting_sort(*addrs); + return true; + } + return false; +} + +bool grpc_ares_maybe_resolve_localhost_manually_locked( + const char* name, const char* default_port, grpc_lb_addresses** addrs) { + char* host = nullptr; + char* port = nullptr; + bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port, + addrs, &host, &port); + gpr_free(host); + gpr_free(port); + return out; +} + #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0e40f42e18..57d0b3759f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); - c->channelz_subchannel.reset(); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); + } gpr_free((void*)c->filters); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + arg = grpc_channel_args_find(c->args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + const grpc_integer_options options = {0, 0, INT_MAX}; + size_t channel_tracer_max_nodes = + (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = - grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( + c, channel_tracer_max_nodes); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); @@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel.get())); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } } +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(channelz_subchannel) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index a135035d62..84febb5204 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel); ~ConnectedSubchannel(); grpc_channel_stack* channel_stack() { return channel_stack_; } @@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + channelz::SubchannelNode* channelz_subchannel() { + return channelz_subchannel_; + } private: grpc_channel_stack* channel_stack_; + // backpointer to the channelz node in this connected subchannel's + // owning subchannel. + channelz::SubchannelNode* channelz_subchannel_; }; } // namespace grpc_core @@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr); +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); + /// Returns the URI string for the address to connect to. const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index cb02b1a748..1c23a6c4be 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -73,7 +73,8 @@ static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) { int grpc_subchannel_key_compare(const grpc_subchannel_key* a, const grpc_subchannel_key* b) { - if (g_force_creation) return false; + // To pretend the keys are different, return a non-zero value. + if (GPR_UNLIKELY(g_force_creation)) return 1; int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index a7dae9d47d..c135613d26 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -65,13 +65,10 @@ void grpc_subchannel_index_ref(void); void grpc_subchannel_index_unref(void); /** \em TEST ONLY. - * If \a force_creation is true, all key comparisons will be false, resulting in + * If \a force_creation is true, all keys are regarded different, resulting in * new subchannels always being created. Otherwise, the keys will be compared as * usual. * - * This function is *not* threadsafe on purpose: it should *only* be used in - * test code. - * * Tests using this function \em MUST run tests with and without \a * force_creation set. */ void grpc_subchannel_index_test_only_set_force_creation(bool force_creation); diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb..91fa163fec 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,6 +51,7 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -78,7 +79,12 @@ struct channel_data { static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.status != nullptr) { - if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { + /* If both gRPC status and HTTP status are provided in the response, we + * should prefer the gRPC status code, as mentioned in + * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + */ + if (b->idx.named.grpc_status != nullptr || + grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { grpc_metadata_batch_remove(b, b->idx.named.status); } else { char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), @@ -147,6 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -162,6 +169,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } + error = grpc_error_add_child( + error, GRPC_ERROR_REF(calld->recv_initial_metadata_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 3919447f26..1b3426b120 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -23,6 +23,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <string.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" @@ -50,6 +51,7 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -60,6 +62,13 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; bool seen_recv_message_ready; + + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; +}; + +struct channel_data { + bool surface_user_agent; }; } // namespace @@ -258,6 +267,11 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem, GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority"))); } + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + if (!chand->surface_user_agent && b->idx.named.user_agent != nullptr) { + grpc_metadata_batch_remove(b, b->idx.named.user_agent); + } + return error; } @@ -267,6 +281,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -313,6 +328,15 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child( + GRPC_ERROR_REF(err), + GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -357,6 +381,13 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } + if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -389,6 +420,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -397,6 +431,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } @@ -405,7 +440,12 @@ static void hs_destroy_call_elem(grpc_call_element* elem, /* Constructor for channel_data */ static grpc_error* hs_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); GPR_ASSERT(!args->is_last); + chand->surface_user_agent = grpc_channel_arg_get_bool( + grpc_channel_args_find(args->channel_args, + const_cast<char*>(GRPC_ARG_SURFACE_USER_AGENT)), + true); return GRPC_ERROR_NONE; } @@ -419,7 +459,7 @@ const grpc_channel_filter grpc_http_server_filter = { hs_init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, hs_destroy_call_elem, - 0, + sizeof(channel_data), hs_init_channel_elem, hs_destroy_channel_elem, grpc_channel_next_get_info, diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 1fe8288bd0..431472609e 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -429,8 +429,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, ? GRPC_MILLIS_INF_FUTURE : DEFAULT_MAX_CONNECTION_IDLE_MS; chand->idle_state = MAX_IDLE_STATE_INIT; - gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, - GRPC_MILLIS_INF_PAST); + gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_MS)) { diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..c17df86f3d 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata_ready; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* original_recv_trailing_metadata_ready; }; struct channel_data { @@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -144,6 +150,17 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + error = + grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +189,13 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +207,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->original_recv_trailing_metadata_ready = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +242,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 027a57d606..26cad2cc9a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -230,35 +230,165 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_chttp2_transport* t, - const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client) { +/* Returns whether bdp is enabled */ +static bool read_channel_args(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + bool is_client) { + bool enable_bdp = true; size_t i; int j; - GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == - GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - - t->base.vtable = get_vtable(); - t->ep = ep; - /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(); - t->peer_string = grpc_endpoint_get_peer(ep); - t->endpoint_reading = 1; - t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->is_first_frame = true; - grpc_connectivity_state_init( - &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, - is_client ? "client_transport" : "server_transport"); - - grpc_slice_buffer_init(&t->qbuf); - - grpc_slice_buffer_init(&t->outbuf); - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + for (i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->next_stream_id = static_cast<uint32_t>(value); + } + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + grpc_chttp2_hpack_compressor_set_max_usable_size( + &t->hpack_compressor, static_cast<uint32_t>(value)); + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + {g_default_max_pings_without_data, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( + &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_sent_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_recv_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { + t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer( + &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); + t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = static_cast<uint32_t>( + grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } + } else { + static const struct { + const char* channel_arg_name; + grpc_chttp2_setting_id setting_id; + grpc_integer_options integer_options; + bool availability[2] /* server, client */; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + {1, 0, 1}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; + for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { + if (0 == strcmp(channel_args->args[i].key, + settings_map[j].channel_arg_name)) { + if (!settings_map[j].availability[is_client]) { + gpr_log(GPR_DEBUG, "%s is not available on %s", + settings_map[j].channel_arg_name, + is_client ? "clients" : "servers"); + } else { + int value = grpc_channel_arg_get_integer( + &channel_args->args[i], settings_map[j].integer_options); + if (value >= 0) { + queue_setting_update(t, settings_map[j].setting_id, + static_cast<uint32_t>(value)); + } + } + break; + } + } + } + } + return enable_bdp; +} +static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -286,6 +416,79 @@ static void init_transport(grpc_chttp2_transport* t, GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); +} + +static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { + if (t->is_client) { + t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_time_ms; + t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; + } else { + t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_time_ms; + t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; + } +} + +static void configure_transport_ping_policy(grpc_chttp2_transport* t) { + t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; + t->ping_policy.min_sent_ping_interval_without_data = + g_default_min_sent_ping_interval_without_data_ms; + t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; + t->ping_policy.min_recv_ping_interval_without_data = + g_default_min_recv_ping_interval_without_data_ms; +} + +static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); + } else { + /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no + inflight keeaplive timers */ + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; + } +} + +static void init_transport(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client) { + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == + GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); + + t->base.vtable = get_vtable(); + t->ep = ep; + /* one ref is for destroy */ + gpr_ref_init(&t->refs, 1); + t->combiner = grpc_combiner_create(); + t->peer_string = grpc_endpoint_get_peer(ep); + t->endpoint_reading = 1; + t->next_stream_id = is_client ? 1 : 2; + t->is_client = is_client; + t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; + t->is_first_frame = true; + grpc_connectivity_state_init( + &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); + + grpc_slice_buffer_init(&t->qbuf); + grpc_slice_buffer_init(&t->outbuf); + grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + + init_transport_closures(t); t->goaway_error = GRPC_ERROR_NONE; grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -301,6 +504,8 @@ static void init_transport(grpc_chttp2_transport* t, grpc_chttp2_stream_map_init(&t->stream_map, 8); /* copy in initial settings to all setting sets */ + size_t i; + int j; for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; @@ -328,191 +533,14 @@ static void init_transport(grpc_chttp2_transport* t, queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); - t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; - t->ping_policy.min_sent_ping_interval_without_data = - g_default_min_sent_ping_interval_without_data_ms; - t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; - t->ping_policy.min_recv_ping_interval_without_data = - g_default_min_recv_ping_interval_without_data_ms; - - /* Keepalive setting */ - if (t->is_client) { - t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_time_ms; - t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_client_keepalive_permit_without_calls; - } else { - t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_time_ms; - t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_server_keepalive_permit_without_calls; - } + configure_transport_ping_policy(t); + init_transport_keepalive_settings(t); t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; bool enable_bdp = true; - if (channel_args) { - for (i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - if ((t->next_stream_id & 1) != (value & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->next_stream_id & 1, is_client ? "client" : "server"); - } else { - t->next_stream_id = static_cast<uint32_t>(value); - } - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->hpack_compressor, static_cast<uint32_t>(value)); - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { - t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_max_pings_without_data, 0, INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { - t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( - &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_sent_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_sent_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_recv_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_recv_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { - t->write_buffer_size = - static_cast<uint32_t>(grpc_channel_arg_get_integer( - &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIME_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_time_ms - : g_default_server_keepalive_time_ms, - 1, INT_MAX}); - t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_timeout_ms - : g_default_server_keepalive_timeout_ms, - 0, INT_MAX}); - t->keepalive_timeout = - value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - t->keepalive_permit_without_calls = static_cast<uint32_t>( - grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_OPTIMIZATION_TARGET)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s should be a string", - GRPC_ARG_OPTIMIZATION_TARGET); - } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == - strcmp(channel_args->args[i].value.string, "throughput")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; - } else { - gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", - GRPC_ARG_OPTIMIZATION_TARGET, - channel_args->args[i].value.string); - } - } else { - static const struct { - const char* channel_arg_name; - grpc_chttp2_setting_id setting_id; - grpc_integer_options integer_options; - bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT32_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, - GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - {1, 0, 1}, - {true, true}}, - {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - {-1, 5, INT32_MAX}, - {true, true}}}; - for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { - if (0 == strcmp(channel_args->args[i].key, - settings_map[j].channel_arg_name)) { - if (!settings_map[j].availability[is_client]) { - gpr_log(GPR_DEBUG, "%s is not available on %s", - settings_map[j].channel_arg_name, - is_client ? "clients" : "servers"); - } else { - int value = grpc_channel_arg_get_integer( - &channel_args->args[i], settings_map[j].integer_options); - if (value >= 0) { - queue_setting_update(t, settings_map[j].setting_id, - static_cast<uint32_t>(value)); - } - } - break; - } - } - } - } + enable_bdp = read_channel_args(t, channel_args, is_client); } if (g_flow_control_enabled) { @@ -531,23 +559,11 @@ static void init_transport(grpc_chttp2_transport* t, t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t->ping_recv_state.ping_strikes = 0; - /* Start keepalive pings */ - if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); - } else { - /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no - inflight keeaplive timers */ - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; - } + init_keepalive_pings_if_enabled(t); if (enable_bdp) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); schedule_bdp_ping_locked(t); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, nullptr); } @@ -2887,17 +2903,20 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } } +void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = grpc_stream_compression_context_create( + stream_->stream_decompression_method); + } +} + grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!stream_->stream_decompression_ctx) { - stream_->stream_decompression_ctx = - grpc_stream_compression_context_create( - stream_->stream_decompression_method); - } + MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, &stream_->unprocessed_incoming_frames_buffer, &stream_->decompressed_data_buffer, nullptr, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ca6e715978..6b5309bab4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -246,6 +246,8 @@ class Chttp2IncomingByteStream : public ByteStream { static void NextLocked(void* arg, grpc_error* error_ignored); static void OrphanLocked(void* arg, grpc_error* error_ignored); + void MaybeCreateStreamDecompressionCtx(); + grpc_chttp2_transport* transport_; // Immutable. grpc_chttp2_stream* stream_; // Immutable. diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 4a252d972d..81e2634e3a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1287,7 +1287,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_error* error = GRPC_ERROR_NONE; if (stream_state->state_op_done[OP_CANCEL_ERROR]) { error = GRPC_ERROR_REF(stream_state->cancel_error); - } else if (stream_state->state_op_done[OP_FAILED]) { + } else if (stream_state->state_callback_received[OP_FAILED]) { error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); } else if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index b3443310ac..cfb2faba51 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -41,16 +41,14 @@ namespace grpc_core { namespace channelz { -ChannelTrace::TraceEvent::TraceEvent( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type) +ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_entity) : severity_(severity), data_(data), timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME)), next_(nullptr), - referenced_channel_(std::move(referenced_channel)), - referenced_type_(type) {} + referenced_entity_(std::move(referenced_entity)) {} ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) : severity_(severity), @@ -110,23 +108,13 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) { AddTraceEventHelper(New<TraceEvent>(severity, data)); } -void ChannelTrace::AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel) { - if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 - // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>( - severity, data, std::move(referenced_channel), ReferencedType::Channel)); -} - -void ChannelTrace::AddTraceEventReferencingSubchannel( +void ChannelTrace::AddTraceEventWithReference( Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel) { + RefCountedPtr<BaseNode> referenced_entity) { if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 // create and fill up the new event - AddTraceEventHelper(New<TraceEvent>(severity, data, - std::move(referenced_subchannel), - ReferencedType::Subchannel)); + AddTraceEventHelper( + New<TraceEvent>(severity, data, std::move(referenced_entity))); } namespace { @@ -157,19 +145,18 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { json_iterator = grpc_json_create_child(json_iterator, json, "timestamp", gpr_format_timespec(timestamp_), GRPC_JSON_STRING, true); - if (referenced_channel_ != nullptr) { + if (referenced_entity_ != nullptr) { + const bool is_channel = + (referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel || + referenced_entity_->type() == BaseNode::EntityType::kInternalChannel); char* uuid_str; - gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid()); + gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_entity_->uuid()); grpc_json* child_ref = grpc_json_create_child( - json_iterator, json, - (referenced_type_ == ReferencedType::Channel) ? "channelRef" - : "subchannelRef", + json_iterator, json, is_channel ? "channelRef" : "subchannelRef", nullptr, GRPC_JSON_OBJECT, false); json_iterator = grpc_json_create_child( - nullptr, child_ref, - (referenced_type_ == ReferencedType::Channel) ? "channelId" - : "subchannelId", - uuid_str, GRPC_JSON_STRING, true); + nullptr, child_ref, is_channel ? "channelId" : "subchannelId", uuid_str, + GRPC_JSON_STRING, true); json_iterator = child_ref; } } @@ -178,24 +165,26 @@ grpc_json* ChannelTrace::RenderJson() const { if (!max_list_size_) return nullptr; // tracing is disabled if max_events == 0 grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); - char* num_events_logged_str; - gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_); grpc_json* json_iterator = nullptr; - json_iterator = - grpc_json_create_child(json_iterator, json, "numEventsLogged", - num_events_logged_str, GRPC_JSON_STRING, true); + if (num_events_logged_ > 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "numEventsLogged", num_events_logged_); + } json_iterator = grpc_json_create_child( json_iterator, json, "creationTimestamp", gpr_format_timespec(time_created_), GRPC_JSON_STRING, true); - grpc_json* events = grpc_json_create_child(json_iterator, json, "events", - nullptr, GRPC_JSON_ARRAY, false); - json_iterator = nullptr; - TraceEvent* it = head_trace_; - while (it != nullptr) { - json_iterator = grpc_json_create_child(json_iterator, events, nullptr, - nullptr, GRPC_JSON_OBJECT, false); - it->RenderTraceEvent(json_iterator); - it = it->next(); + // only add in the event list if it is non-empty. + if (num_events_logged_ > 0) { + grpc_json* events = grpc_json_create_child(json_iterator, json, "events", + nullptr, GRPC_JSON_ARRAY, false); + json_iterator = nullptr; + TraceEvent* it = head_trace_; + while (it != nullptr) { + json_iterator = grpc_json_create_child(json_iterator, events, nullptr, + nullptr, GRPC_JSON_OBJECT, false); + it->RenderTraceEvent(json_iterator); + it = it->next(); + } } return json; } diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h index 596af7402f..94fea20b45 100644 --- a/src/core/lib/channel/channel_trace.h +++ b/src/core/lib/channel/channel_trace.h @@ -30,7 +30,7 @@ namespace grpc_core { namespace channelz { -class ChannelNode; +class BaseNode; // Object used to hold live data for a channel. This data is exposed via the // channelz service: @@ -55,35 +55,28 @@ class ChannelTrace { void AddTraceEvent(Severity severity, grpc_slice data); // Adds a new trace event to the tracing object. This trace event refers to a - // an event on a child of the channel. For example, if this channel has - // created a new subchannel, then it would record that with a TraceEvent - // referencing the new subchannel. + // an event that concerns a different channelz entity. For example, if this + // channel has created a new subchannel, then it would record that with + // a TraceEvent referencing the new subchannel. // // TODO(ncteisen): as this call is used more and more throughout the gRPC // stack, determine if it makes more sense to accept a char* instead of a // slice. - void AddTraceEventReferencingChannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel); - void AddTraceEventReferencingSubchannel( - Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_subchannel); + void AddTraceEventWithReference(Severity severity, grpc_slice data, + RefCountedPtr<BaseNode> referenced_entity); // Creates and returns the raw grpc_json object, so a parent channelz // object may incorporate the json before rendering. grpc_json* RenderJson() const; private: - // Types of objects that can be references by trace events. - enum class ReferencedType { Channel, Subchannel }; // Private class to encapsulate all the data and bookkeeping needed for a // a trace event. class TraceEvent { public: - // Constructor for a TraceEvent that references a different channel. + // Constructor for a TraceEvent that references a channel. TraceEvent(Severity severity, grpc_slice data, - RefCountedPtr<ChannelNode> referenced_channel, - ReferencedType type); + RefCountedPtr<BaseNode> referenced_entity_); // Constructor for a TraceEvent that does not reverence a different // channel. @@ -105,10 +98,7 @@ class ChannelTrace { gpr_timespec timestamp_; TraceEvent* next_; // the tracer object for the (sub)channel that this trace event refers to. - RefCountedPtr<ChannelNode> referenced_channel_; - // the type that the referenced tracer points to. Unused if this trace - // does not point to any channel or subchannel - ReferencedType referenced_type_; + RefCountedPtr<BaseNode> referenced_entity_; }; // TraceEvent // Internal helper to add and link in a trace event diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 9d6002ed8a..375cf25cc6 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -41,33 +41,62 @@ namespace grpc_core { namespace channelz { -ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel) - : channel_(channel), - target_(nullptr), - channel_uuid_(-1), - is_top_level_channel_(is_top_level_channel) { - trace_.Init(channel_tracer_max_nodes); - target_ = UniquePtr<char>(grpc_channel_get_target(channel_)); - channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this); +BaseNode::BaseNode(EntityType type) + : type_(type), uuid_(ChannelzRegistry::Register(this)) {} + +BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); } + +char* BaseNode::RenderJsonString() { + grpc_json* json = RenderJson(); + GPR_ASSERT(json != nullptr); + char* json_str = grpc_json_dump_to_string(json, 0); + grpc_json_destroy(json); + return json_str; +} + +CallCountingHelper::CallCountingHelper() { gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -ChannelNode::~ChannelNode() { - trace_.Destroy(); - ChannelzRegistry::UnregisterChannelNode(channel_uuid_); -} +CallCountingHelper::~CallCountingHelper() {} -void ChannelNode::RecordCallStarted() { +void CallCountingHelper::RecordCallStarted() { gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } -void ChannelNode::PopulateConnectivityState(grpc_json* json) {} +void CallCountingHelper::PopulateCallCounts(grpc_json* json) { + grpc_json* json_iterator = nullptr; + if (calls_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsStarted", calls_started_); + } + if (calls_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsSucceeded", calls_succeeded_); + } + if (calls_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsFailed", calls_failed_); + } + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); +} + +ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) + : BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel + : EntityType::kInternalChannel), + channel_(channel), + target_(UniquePtr<char>(grpc_channel_get_target(channel_))), + trace_(channel_tracer_max_nodes) {} -void ChannelNode::PopulateChildRefs(grpc_json* json) {} +ChannelNode::~ChannelNode() {} grpc_json* ChannelNode::RenderJson() { // We need to track these three json objects to build our object @@ -80,7 +109,7 @@ grpc_json* ChannelNode::RenderJson() { json = json_iterator; json_iterator = nullptr; json_iterator = grpc_json_add_number_string_child(json, json_iterator, - "channelId", channel_uuid_); + "channelId", uuid()); // reset json iterators to top level object json = top_level_json; json_iterator = nullptr; @@ -89,51 +118,28 @@ grpc_json* ChannelNode::RenderJson() { GRPC_JSON_OBJECT, false); json = data; json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateConnectivityState(json); + // populate the target. GPR_ASSERT(target_.get() != nullptr); - json_iterator = grpc_json_create_child( - json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false); + grpc_json_create_child(nullptr, json, "target", target_.get(), + GRPC_JSON_STRING, false); // fill in the channel trace if applicable - grpc_json* trace = trace_->RenderJson(); - if (trace != nullptr) { - // we manually link up and fill the child since it was created for us in - // ChannelTrace::RenderJson - trace->key = "trace"; // this object is named trace in channelz.proto - json_iterator = grpc_json_link_child(json, trace, json_iterator); - } - // reset the parent to be the data object. - json = data; - json_iterator = nullptr; - if (calls_started_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsStarted", calls_started_); + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); } - if (calls_succeeded_ != 0) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsSucceeded", calls_succeeded_); - } - if (calls_failed_) { - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsFailed", calls_failed_); - } - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); - json_iterator = - grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", - gpr_format_timespec(ts), GRPC_JSON_STRING, true); + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); json = top_level_json; - json_iterator = nullptr; + // template method. Child classes may override this to add their specific + // functionality. PopulateChildRefs(json); return top_level_json; } -char* ChannelNode::RenderJsonString() { - grpc_json* json = RenderJson(); - char* json_str = grpc_json_dump_to_string(json, 0); - grpc_json_destroy(json); - return json_str; -} - RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( grpc_channel* channel, size_t channel_tracer_max_nodes, bool is_top_level_channel) { @@ -141,12 +147,41 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( channel, channel_tracer_max_nodes, is_top_level_channel); } -SubchannelNode::SubchannelNode() { - subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this); -} +ServerNode::ServerNode(size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {} + +ServerNode::~ServerNode() {} -SubchannelNode::~SubchannelNode() { - ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_); +grpc_json* ServerNode::RenderJson() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "serverId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + json = top_level_json; + return top_level_json; } } // namespace channelz diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 07eb73d626..9be256147b 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -43,14 +43,52 @@ namespace grpc_core { namespace channelz { namespace testing { +class CallCountingHelperPeer; class ChannelNodePeer; -} +} // namespace testing -class ChannelNode : public RefCounted<ChannelNode> { +// base class for all channelz entities +class BaseNode : public RefCounted<BaseNode> { public: - static RefCountedPtr<ChannelNode> MakeChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); + // There are only four high level channelz entities. However, to support + // GetTopChannelsRequest, we split the Channel entity into two different + // types. All children of BaseNode must be one of these types. + enum class EntityType { + kTopLevelChannel, + kInternalChannel, + kSubchannel, + kServer, + kSocket, + }; + + explicit BaseNode(EntityType type); + virtual ~BaseNode(); + + // All children must implement this function. + virtual grpc_json* RenderJson() GRPC_ABSTRACT; + + // Renders the json and returns allocated string that must be freed by the + // caller. + char* RenderJsonString(); + + EntityType type() const { return type_; } + intptr_t uuid() const { return uuid_; } + + private: + const EntityType type_; + const intptr_t uuid_; +}; + +// This class is a helper class for channelz entities that deal with Channels, +// Subchannels, and Servers, since those have similar proto definitions. +// This class has the ability to: +// - track calls_{started,succeeded,failed} +// - track last_call_started_timestamp +// - perform rendering of the above items +class CallCountingHelper { + public: + CallCountingHelper(); + ~CallCountingHelper(); void RecordCallStarted(); void RecordCallFailed() { @@ -60,17 +98,46 @@ class ChannelNode : public RefCounted<ChannelNode> { gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); } - grpc_json* RenderJson(); - char* RenderJsonString(); + // Common rendering of the call count data and last_call_started_timestamp. + void PopulateCallCounts(grpc_json* json); - // helper for getting and populating connectivity state. It is virtual - // because it allows the client_channel specific code to live in ext/ - // instead of lib/ - virtual void PopulateConnectivityState(grpc_json* json); + private: + // testing peer friend. + friend class testing::CallCountingHelperPeer; - virtual void PopulateChildRefs(grpc_json* json); + gpr_atm calls_started_ = 0; + gpr_atm calls_succeeded_ = 0; + gpr_atm calls_failed_ = 0; + gpr_atm last_call_started_millis_ = 0; +}; - ChannelTrace* trace() { return trace_.get(); } +// Handles channelz bookkeeping for channels +class ChannelNode : public BaseNode { + public: + static RefCountedPtr<ChannelNode> MakeChannelNode( + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + + ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); + ~ChannelNode() override; + + grpc_json* RenderJson() override; + + // template methods. RenderJSON uses these methods to render its JSON + // representation. These are virtual so that children classes may provide + // their specific mechanism for populating these parts of the channelz + // object. + // + // ChannelNode does not have a notion of connectivity state or child refs, + // so it leaves these implementations blank. + // + // This is utilizing the template method design pattern. + // + // TODO(ncteisen): remove these template methods in favor of manual traversal + // and mutation of the grpc_json object. + virtual void PopulateConnectivityState(grpc_json* json) {} + virtual void PopulateChildRefs(grpc_json* json) {} void MarkChannelDestroyed() { GPR_ASSERT(channel_ != nullptr); @@ -79,47 +146,62 @@ class ChannelNode : public RefCounted<ChannelNode> { bool ChannelIsDestroyed() { return channel_ == nullptr; } - intptr_t channel_uuid() { return channel_uuid_; } - bool is_top_level_channel() { return is_top_level_channel_; } - - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, - bool is_top_level_channel); - virtual ~ChannelNode(); + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - // testing peer friend. + // to allow the channel trace test to access trace_. friend class testing::ChannelNodePeer; - grpc_channel* channel_ = nullptr; UniquePtr<char> target_; - gpr_atm calls_started_ = 0; - gpr_atm calls_succeeded_ = 0; - gpr_atm calls_failed_ = 0; - gpr_atm last_call_started_millis_ = 0; - intptr_t channel_uuid_; - bool is_top_level_channel_ = true; - ManualConstructor<ChannelTrace> trace_; + CallCountingHelper call_counter_; + ChannelTrace trace_; }; -// Placeholds channelz class for subchannels. All this can do now is track its -// uuid (this information is needed by the parent channelz class). -// TODO(ncteisen): build this out to support the GetSubchannel channelz request. -class SubchannelNode : public RefCounted<SubchannelNode> { +// Handles channelz bookkeeping for servers +class ServerNode : public BaseNode { public: - SubchannelNode(); - virtual ~SubchannelNode(); + explicit ServerNode(size_t channel_tracer_max_nodes); + ~ServerNode() override; - intptr_t subchannel_uuid() { return subchannel_uuid_; } + grpc_json* RenderJson() override; - protected: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - intptr_t subchannel_uuid_; + CallCountingHelper call_counter_; + ChannelTrace trace_; +}; + +// Handles channelz bookkeeping for sockets +// TODO(ncteisen): implement in subsequent PR. +class SocketNode : public BaseNode { + public: + SocketNode() : BaseNode(EntityType::kSocket) {} + ~SocketNode() override {} }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index f79d2f0c17..adc7b6ba44 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -53,54 +53,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); } ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); } -intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) { +intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) { MutexLock lock(&mu_); - entities_.push_back(entry); + entities_.push_back(node); intptr_t uuid = entities_.size(); return uuid; } -void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) { +void ChannelzRegistry::InternalUnregister(intptr_t uuid) { GPR_ASSERT(uuid >= 1); MutexLock lock(&mu_); GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size()); - GPR_ASSERT(entities_[uuid - 1].type == type); - entities_[uuid - 1].object = nullptr; - entities_[uuid - 1].type = EntityType::kUnset; + entities_[uuid - 1] = nullptr; } -void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) { +BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) { MutexLock lock(&mu_); if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) { return nullptr; } - if (entities_[uuid - 1].type == type) { - return entities_[uuid - 1].object; - } else { - return nullptr; - } + return entities_[uuid - 1]; } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; - InlinedVector<ChannelNode*, 10> top_level_channels; + InlinedVector<BaseNode*, 10> top_level_channels; // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is // reserved). However, we want to support requests coming in with // start_channel_id=0, which signifies "give me everything." Hence this // funky looking line below. size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; for (size_t i = start_idx; i < entities_.size(); ++i) { - if (entities_[i].type == EntityType::kChannelNode) { - ChannelNode* channel_node = - static_cast<ChannelNode*>(entities_[i].object); - if (channel_node->is_top_level_channel()) { - top_level_channels.push_back(channel_node); - } + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) { + top_level_channels.push_back(entities_[i]); } } - if (top_level_channels.size() > 0) { + if (!top_level_channels.empty()) { // create list of channels grpc_json* array_parent = grpc_json_create_child( nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false); @@ -120,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { return json_str; } +char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + InlinedVector<BaseNode*, 10> servers; + // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is + // reserved). However, we want to support requests coming in with + // start_server_id=0, which signifies "give me everything." + size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1; + for (size_t i = start_idx; i < entities_.size(); ++i) { + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kServer) { + servers.push_back(entities_[i]); + } + } + if (!servers.empty()) { + // create list of servers + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false); + for (size_t i = 0; i < servers.size(); ++i) { + grpc_json* server_json = servers[i]->RenderJson(); + json_iterator = + grpc_json_link_child(array_parent, server_json, json_iterator); + } + } + // For now we do not have any pagination rules. In the future we could + // pick a constant for max_channels_sent for a GetServers request. + // Tracking: https://github.com/grpc/grpc/issues/16019. + json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, + GRPC_JSON_TRUE, false); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} + } // namespace channelz } // namespace grpc_core @@ -128,10 +156,18 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) { start_channel_id); } +char* grpc_channelz_get_servers(intptr_t start_server_id) { + return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id); +} + char* grpc_channelz_get_channel(intptr_t channel_id) { - grpc_core::channelz::ChannelNode* channel_node = - grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id); - if (channel_node == nullptr) { + grpc_core::channelz::BaseNode* channel_node = + grpc_core::channelz::ChannelzRegistry::Get(channel_id); + if (channel_node == nullptr || + (channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel && + channel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) { return nullptr; } grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); @@ -143,3 +179,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) { grpc_json_destroy(top_level_json); return json_str; } + +char* grpc_channelz_get_subchannel(intptr_t subchannel_id) { + grpc_core::channelz::BaseNode* subchannel_node = + grpc_core::channelz::ChannelzRegistry::Get(subchannel_id); + if (subchannel_node == nullptr || + subchannel_node->type() != + grpc_core::channelz::BaseNode::EntityType::kSubchannel) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* subchannel_json = subchannel_node->RenderJson(); + subchannel_json->key = "subchannel"; + grpc_json_link_child(json, subchannel_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index 5d7c936726..d0d660600d 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -40,32 +40,11 @@ class ChannelzRegistry { // To be called in grpc_shutdown(); static void Shutdown(); - // Register/Unregister/Get for ChannelNode - static intptr_t RegisterChannelNode(ChannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kChannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterChannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode); - } - static ChannelNode* GetChannelNode(intptr_t uuid) { - void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode); - return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten); - } - - // Register/Unregister/Get for SubchannelNode - static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) { - RegistryEntry entry(channel_node, EntityType::kSubchannelNode); - return Default()->InternalRegisterEntry(entry); - } - static void UnregisterSubchannelNode(intptr_t uuid) { - Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode); - } - static SubchannelNode* GetSubchannelNode(intptr_t uuid) { - void* gotten = - Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode); - return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten); + static intptr_t Register(BaseNode* node) { + return Default()->InternalRegister(node); } + static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); } + static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); } // Returns the allocated JSON string that represents the proto // GetTopChannelsResponse as per channelz.proto. @@ -73,20 +52,13 @@ class ChannelzRegistry { return Default()->InternalGetTopChannels(start_channel_id); } - private: - enum class EntityType { - kChannelNode, - kSubchannelNode, - kUnset, - }; - - struct RegistryEntry { - RegistryEntry(void* object_in, EntityType type_in) - : object(object_in), type(type_in) {} - void* object; - EntityType type; - }; + // Returns the allocated JSON string that represents the proto + // GetServersResponse as per channelz.proto. + static char* GetServers(intptr_t start_server_id) { + return Default()->InternalGetServers(start_server_id); + } + private: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE @@ -97,21 +69,22 @@ class ChannelzRegistry { static ChannelzRegistry* Default(); // globally registers an Entry. Returns its unique uuid - intptr_t InternalRegisterEntry(const RegistryEntry& entry); + intptr_t InternalRegister(BaseNode* node); // globally unregisters the object that is associated to uuid. Also does // sanity check that an object doesn't try to unregister the wrong type. - void InternalUnregisterEntry(intptr_t uuid, EntityType type); + void InternalUnregister(intptr_t uuid); // if object with uuid has previously been registered as the correct type, // returns the void* associated with that uuid. Else returns nullptr. - void* InternalGetEntry(intptr_t uuid, EntityType type); + BaseNode* InternalGet(intptr_t uuid); char* InternalGetTopChannels(intptr_t start_channel_id); + char* InternalGetServers(intptr_t start_server_id); // protects entities_ and uuid_ gpr_mu mu_; - InlinedVector<RegistryEntry, 20> entities_; + InlinedVector<BaseNode*, 20> entities_; }; } // namespace channelz diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 90ed34da11..13bc69ffb6 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -513,9 +513,24 @@ bool grpc_error_get_str(grpc_error* err, grpc_error_strs which, grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) { GPR_TIMER_SCOPE("grpc_error_add_child", 0); - grpc_error* new_err = copy_error_and_unref(src); - internal_add_error(&new_err, child); - return new_err; + if (src != GRPC_ERROR_NONE) { + if (child == GRPC_ERROR_NONE) { + /* \a child is empty. Simply return the ref to \a src */ + return src; + } else if (child != src) { + grpc_error* new_err = copy_error_and_unref(src); + internal_add_error(&new_err, child); + return new_err; + } else { + /* \a src and \a child are the same. Drop one of the references and return + * the other */ + GRPC_ERROR_UNREF(child); + return src; + } + } else { + /* \a src is empty. Simply return the ref to \a child */ + return child; + } } static const char* no_error_string = "\"No Error\""; diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 27c4d22fd1..49f4029bc2 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -185,8 +185,16 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. The src error takes ownership of the /// child error. +/// +/// Edge Conditions - +/// 1) If either of \a src or \a child is GRPC_ERROR_NONE, returns a reference +/// to the other argument. 2) If both \a src and \a child are GRPC_ERROR_NONE, +/// returns GRPC_ERROR_NONE. 3) If \a src and \a child point to the same error, +/// returns a single reference. (Note that, 2 references should have been +/// received to the error in this case.) grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; + grpc_error* grpc_os_error(const char* file, int line, int err, const char* call_name) GRPC_MUST_USE_RESULT; diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index abf96662f5..3d459059d7 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -82,6 +82,11 @@ #define GRPC_LINUX_SOCKETUTILS 1 #endif #endif +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37) +#define GRPC_HAVE_TCP_USER_TIMEOUT +#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37) */ +#endif /* LINUX_VERSION_CODE */ #ifndef __GLIBC__ #define GRPC_LINUX_EPOLL 1 #define GRPC_LINUX_EPOLL_CREATE1 1 diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index c4b991c94d..50674b0845 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -41,6 +41,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -222,6 +223,95 @@ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency) { return GRPC_ERROR_NONE; } +/* The default values for TCP_USER_TIMEOUT are currently configured to be in + * line with the default values of KEEPALIVE_TIMEOUT as proposed in + * https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */ +#define DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS 20000 /* 20 seconds */ +#define DEFAULT_SERVER_TCP_USER_TIMEOUT_MS 20000 /* 20 seconds */ + +static int g_default_client_tcp_user_timeout_ms = + DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS; +static int g_default_server_tcp_user_timeout_ms = + DEFAULT_SERVER_TCP_USER_TIMEOUT_MS; +static bool g_default_client_tcp_user_timeout_enabled = false; +static bool g_default_server_tcp_user_timeout_enabled = true; + +void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) { + if (is_client) { + g_default_client_tcp_user_timeout_enabled = enable; + if (timeout > 0) { + g_default_client_tcp_user_timeout_ms = timeout; + } + } else { + g_default_server_tcp_user_timeout_enabled = enable; + if (timeout > 0) { + g_default_server_tcp_user_timeout_ms = timeout; + } + } +} + +/* Set TCP_USER_TIMEOUT */ +grpc_error* grpc_set_socket_tcp_user_timeout( + int fd, const grpc_channel_args* channel_args, bool is_client) { +#ifdef GRPC_HAVE_TCP_USER_TIMEOUT + bool enable; + int timeout; + if (is_client) { + enable = g_default_client_tcp_user_timeout_enabled; + timeout = g_default_client_tcp_user_timeout_ms; + } else { + enable = g_default_server_tcp_user_timeout_enabled; + timeout = g_default_server_tcp_user_timeout_ms; + } + if (channel_args) { + for (unsigned int i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX}); + /* Continue using default if value is 0 */ + if (value == 0) { + continue; + } + /* Disable if value is INT_MAX */ + enable = value != INT_MAX; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX}); + /* Continue using default if value is 0 */ + if (value == 0) { + continue; + } + timeout = value; + } + } + } + if (enable) { + extern grpc_core::TraceFlag grpc_tcp_trace; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "Enabling TCP_USER_TIMEOUT with a timeout of %d ms", + timeout); + } + int newval; + socklen_t len = sizeof(newval); + if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout, + sizeof(timeout))) { + return GRPC_OS_ERROR(errno, "setsockopt(TCP_USER_TIMEOUT)"); + } + if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) { + return GRPC_OS_ERROR(errno, "getsockopt(TCP_USER_TIMEOUT)"); + } + if (newval != timeout) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to set TCP_USER_TIMEOUT"); + } + } +#else + gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform"); +#endif /* GRPC_HAVE_TCP_USER_TIMEOUT */ + return GRPC_ERROR_NONE; +} + /* set a socket using a grpc_socket_mutator */ grpc_error* grpc_set_socket_with_mutator(int fd, grpc_socket_mutator* mutator) { GPR_ASSERT(mutator); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index b3fd58a530..71a304dc4e 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -53,6 +53,13 @@ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency); /* set SO_REUSEPORT */ grpc_error* grpc_set_socket_reuse_port(int fd, int reuse); +/* Configure the default values for TCP_USER_TIMEOUT */ +void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client); + +/* Set TCP_USER_TIMEOUT */ +grpc_error* grpc_set_socket_tcp_user_timeout( + int fd, const grpc_channel_args* channel_args, bool is_client); + /* Returns true if this system can create AF_INET6 sockets bound to ::1. The value is probed once, and cached for the life of the process. diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 9c989b7dfe..8553ed0db4 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -76,6 +76,9 @@ static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd, if (!grpc_is_unix_socket(addr)) { err = grpc_set_socket_low_latency(fd, 1); if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_tcp_user_timeout(fd, channel_args, + true /* is_client */); + if (err != GRPC_ERROR_NONE) goto error; } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 9595c028ce..8d8d3f4273 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -166,6 +166,9 @@ grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server* s, int fd, if (err != GRPC_ERROR_NONE) goto error; err = grpc_set_socket_reuse_addr(fd, 1); if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_tcp_user_timeout(fd, s->channel_args, + false /* is_client */); + if (err != GRPC_ERROR_NONE) goto error; } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 4294162af7..008d37119a 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -291,7 +291,7 @@ static void timer_list_init() { static void timer_list_shutdown() { size_t i; run_some_expired_timers( - GPR_ATM_MAX, nullptr, + GRPC_MILLIS_INF_FUTURE, nullptr, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < g_num_shards; i++) { timer_shard* shard = &g_shards[i]; @@ -714,9 +714,10 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { #if GPR_ARCH_64 gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64 - " glob_min=%" PRIdPTR, + " glob_min=%" PRId64, now, next_str, min_timer, - gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); + static_cast<grpc_millis>(gpr_atm_no_barrier_load( + (gpr_atm*)(&g_shared_mutables.min_timer)))); #else gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64, now, next_str, min_timer); diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index 04b4c87c71..6246613e7b 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -59,8 +59,8 @@ static const char* installed_roots_path = /** Environment variable used as a flag to enable/disable loading system root certificates from the OS trust store. */ -#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR -#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS" +#ifndef GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR +#define GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_NOT_USE_SYSTEM_SSL_ROOTS" #endif #ifndef TSI_OPENSSL_ALPN_SUPPORT @@ -1192,10 +1192,10 @@ const char* DefaultSslRootStore::GetPemRootCerts() { grpc_slice DefaultSslRootStore::ComputePemRootCerts() { grpc_slice result = grpc_empty_slice(); - char* use_system_roots_env_value = - gpr_getenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR); - const bool use_system_roots = gpr_is_true(use_system_roots_env_value); - gpr_free(use_system_roots_env_value); + char* not_use_system_roots_env_value = + gpr_getenv(GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR); + const bool not_use_system_roots = gpr_is_true(not_use_system_roots_env_value); + gpr_free(not_use_system_roots_env_value); // First try to load the roots from the environment. char* default_root_certs_path = gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); @@ -1218,7 +1218,7 @@ grpc_slice DefaultSslRootStore::ComputePemRootCerts() { gpr_free(pem_root_certs); } // Try loading roots from OS trust store if flag is enabled. - if (GRPC_SLICE_IS_EMPTY(result) && use_system_roots) { + if (GRPC_SLICE_IS_EMPTY(result) && !not_use_system_roots) { result = LoadSystemRootCerts(); } // Fallback to roots manually shipped with gRPC. diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 19cbb03b63..552e70130a 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -41,6 +41,9 @@ struct call_data { grpc_transport_stream_op_batch* recv_initial_metadata_batch; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; + grpc_error* error; + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; grpc_metadata_array md; const grpc_metadata* consumed_md; size_t num_consumed_md; @@ -111,6 +114,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); } + calld->error = GRPC_ERROR_REF(error); GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); } @@ -184,6 +188,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { GRPC_ERROR_REF(error)); } +static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void auth_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = static_cast<call_data*>(elem->call_data); @@ -195,6 +206,12 @@ static void auth_start_transport_stream_op_batch( batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } + if (batch->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } grpc_call_next_op(elem, batch); } @@ -208,6 +225,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Create server security context. Set its auth context from channel // data and save it in the call context. grpc_server_security_context* server_ctx = @@ -227,7 +247,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_ERROR_UNREF(calld->error); +} /* Constructor for channel_data */ static grpc_error* init_channel_elem(grpc_channel_element* elem, diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 2923a86646..a9349afa68 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -48,6 +48,7 @@ #include "src/core/lib/surface/call_test_only.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" @@ -71,46 +72,6 @@ // Used to create arena for the first call. #define ESTIMATED_MDELEM_COUNT 16 -/* Status data for a request can come from several sources; this - enumerates them all, and acts as a priority sorting for which - status to return to the application - earlier entries override - later ones */ -typedef enum { - /* Status came from the application layer overriding whatever - the wire says */ - STATUS_FROM_API_OVERRIDE = 0, - /* Status came from 'the wire' - or somewhere below the surface - layer */ - STATUS_FROM_WIRE, - /* Status was created by some internal channel stack operation: must come via - add_batch_error */ - STATUS_FROM_CORE, - /* Status was created by some surface error */ - STATUS_FROM_SURFACE, - /* Status came from the server sending status */ - STATUS_FROM_SERVER_STATUS, - STATUS_SOURCE_COUNT -} status_source; - -typedef struct { - bool is_set; - grpc_error* error; -} received_status; - -static gpr_atm pack_received_status(received_status r) { - return r.is_set ? (1 | (gpr_atm)r.error) : 0; -} - -static received_status unpack_received_status(gpr_atm atm) { - if ((atm & 1) == 0) { - return {false, GRPC_ERROR_NONE}; - } else { - return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))}; - } -} - -#define MAX_ERRORS_PER_BATCH 4 - typedef struct batch_control { grpc_call* call; /* Share memory for cq_completion and notify_tag as they are never needed @@ -135,10 +96,7 @@ typedef struct batch_control { grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; - - grpc_error* errors[MAX_ERRORS_PER_BATCH]; - gpr_atm num_errors; - + gpr_atm batch_error; grpc_transport_stream_op_batch op; } batch_control; @@ -201,9 +159,6 @@ struct grpc_call { // A char* indicating the peer name. gpr_atm peer_string; - /* Packed received call statuses from various sources */ - gpr_atm status[STATUS_SOURCE_COUNT]; - /* Call data useful used for reporting. Only valid after the call has * completed */ grpc_call_final_info final_info; @@ -236,6 +191,7 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; + gpr_atm cancelled; grpc_closure release_call; @@ -247,8 +203,11 @@ struct grpc_call { } client; struct { int* cancelled; + // backpointer to owning server if this is a server side call. + grpc_server* server; } server; } final_op; + gpr_atm status_error; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -286,23 +245,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression"); static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op, grpc_closure* start_batch_closure); -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, + +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description); -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error); +static void cancel_with_error(grpc_call* c, grpc_error* error); static void destroy_call(void* call_stack, grpc_error* error); static void receiving_slice_ready(void* bctlp, grpc_error* error); -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string); -static void set_status_value_directly(grpc_status_code status, void* dest); -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error); +static void set_final_status(grpc_call* call, grpc_error* error); static void process_data_after_md(batch_control* bctl); static void post_batch_completion(batch_control* bctl); -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled); static void add_init_error(grpc_error** composite, grpc_error* new_err) { if (new_err == GRPC_ERROR_NONE) return; @@ -353,6 +304,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); + gpr_atm_no_barrier_store(&call->cancelled, 0); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; @@ -362,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE); call->is_client = args->server_transport_data == nullptr; - if (call->is_client) { - GRPC_STATS_INC_CLIENT_CALLS_CREATED(); - } else { - GRPC_STATS_INC_SERVER_CALLS_CREATED(); - } call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { + GRPC_STATS_INC_CLIENT_CALLS_CREATED(); GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { @@ -383,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, call->send_extra_metadata_count = static_cast<int>(args->add_initial_metadata_count); } else { + GRPC_STATS_INC_SERVER_CALLS_CREATED(); + call->final_op.server.server = args->server; GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } @@ -464,10 +414,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_mu_unlock(&pc->child_list_mu); } if (error != GRPC_ERROR_NONE) { - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + cancel_with_error(call, GRPC_ERROR_REF(error)); } if (immediately_cancel) { - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(call, GRPC_ERROR_CANCELLED); } if (args->cq != nullptr) { GPR_ASSERT(args->pollset_set_alternative == nullptr && @@ -486,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, &call->pollent); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - channelz_channel->RecordCallStarted(); + if (call->is_client) { + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + channelz_channel->RecordCallStarted(); + } + } else { + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { + channelz_server->RecordCallStarted(); + } } grpc_slice_unref_internal(path); @@ -561,16 +519,15 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - nullptr, &(c->final_info.error_string)); + grpc_error* status_error = + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error)); + grpc_error_get_status(status_error, c->send_deadline, + &c->final_info.final_status, nullptr, nullptr, + &(c->final_info.error_string)); + GRPC_ERROR_UNREF(status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); - } - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); @@ -608,7 +565,7 @@ void grpc_call_unref(grpc_call* c) { bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { - cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(c, GRPC_ERROR_CANCELLED); } else { // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if @@ -626,8 +583,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); grpc_core::ExecCtx exec_ctx; - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); - + cancel_with_error(call, GRPC_ERROR_CANCELLED); return GRPC_CALL_OK; } @@ -681,8 +637,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == nullptr); - cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description); - + cancel_with_status(c, status, description); return GRPC_CALL_OK; } @@ -702,15 +657,17 @@ static void done_termination(void* arg, grpc_error* error) { gpr_free(state); } -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error) { +static void cancel_with_error(grpc_call* c, grpc_error* error) { + if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) { + GRPC_ERROR_UNREF(error); + return; + } GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error)); - set_status_from_error(c, source, GRPC_ERROR_REF(error)); cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state))); state->call = c; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, @@ -733,90 +690,47 @@ static grpc_error* error_from_status(grpc_status_code status, GRPC_ERROR_INT_GRPC_STATUS, status); } -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description) { - cancel_with_error(c, source, error_from_status(status, description)); -} - -/******************************************************************************* - * FINAL STATUS CODE MANIPULATION - */ - -static bool get_final_status_from( - grpc_call* call, grpc_error* error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - grpc_status_code code; - grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr, - error_string); - if (code == GRPC_STATUS_OK && !allow_ok_status) { - return false; - } - - set_value(code, set_value_user_data); - if (details != nullptr) { - *details = grpc_slice_ref_internal(slice); - } - return true; + cancel_with_error(c, error_from_status(status, description)); } -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - int i; - received_status status[STATUS_SOURCE_COUNT]; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); - } +static void set_final_status(grpc_call* call, grpc_error* error) { if (grpc_call_error_trace.enabled()) { - gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR"); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error)); - } - } + gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); + gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - /* first search through ignoring "OK" statuses: if something went wrong, - * ensure we report it */ - for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) { - /* search for the best status we can present: ideally the error we use has a - clearly defined grpc-status, and we'll prefer that. */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set && - grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } + if (call->is_client) { + grpc_error_get_status(error, call->send_deadline, + call->final_op.client.status, + call->final_op.client.status_details, nullptr, + call->final_op.client.error_string); + // explicitly take a ref + grpc_slice_ref_internal(*call->final_op.client.status_details); + gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error)); + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); } } - /* If no clearly defined status exists, search for 'anything' */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } + } else { + *call->final_op.server.cancelled = + error != GRPC_ERROR_NONE || + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) != + GRPC_ERROR_NONE; + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { + if (*call->final_op.server.cancelled) { + channelz_server->RecordCallFailed(); + } else { + channelz_server->RecordCallSucceeded(); } } - } - /* If nothing exists, set some default */ - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error) { - if (!gpr_atm_rel_cas(&call->status[source], - pack_received_status({false, GRPC_ERROR_NONE}), - pack_received_status({true, error}))) { GRPC_ERROR_UNREF(error); } } @@ -1035,6 +949,7 @@ static grpc_stream_compression_algorithm decode_stream_compression( static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b, int is_trailing) { if (b->list.count == 0) return; + if (!call->is_client && is_trailing) return; if (is_trailing && call->buffered_metadata[1] == nullptr) return; GPR_TIMER_SCOPE("publish_app_metadata", 0); grpc_metadata_array* dest; @@ -1088,9 +1003,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) { publish_app_metadata(call, b, false); } -static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { +static void recv_trailing_filter(void* args, grpc_metadata_batch* b, + grpc_error* batch_error) { grpc_call* call = static_cast<grpc_call*>(args); - if (b->idx.named.grpc_status != nullptr) { + if (batch_error != GRPC_ERROR_NONE) { + set_final_status(call, batch_error); + } else if (b->idx.named.grpc_status != nullptr) { grpc_status_code status_code = grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); grpc_error* error = GRPC_ERROR_NONE; @@ -1108,8 +1026,18 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_empty_slice()); } - set_status_from_error(call, STATUS_FROM_WIRE, error); + set_final_status(call, GRPC_ERROR_REF(error)); grpc_metadata_batch_remove(b, b->idx.named.grpc_status); + GRPC_ERROR_UNREF(error); + } else if (!call->is_client) { + set_final_status(call, GRPC_ERROR_NONE); + } else { + gpr_log(GPR_DEBUG, + "Received trailing metadata with no error and no status"); + set_final_status( + call, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN)); } publish_app_metadata(call, b, true); } @@ -1124,14 +1052,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { * BATCH API IMPLEMENTATION */ -static void set_status_value_directly(grpc_status_code status, void* dest) { - *static_cast<grpc_status_code*>(dest) = status; -} - -static void set_cancelled_value(grpc_status_code status, void* dest) { - *static_cast<int*>(dest) = (status != GRPC_STATUS_OK); -} - static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = @@ -1199,31 +1119,18 @@ static void finish_batch_completion(void* user_data, GRPC_CALL_INTERNAL_UNREF(call, "completion"); } -static grpc_error* consolidate_batch_errors(batch_control* bctl) { - size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors)); - if (n == 0) { - return GRPC_ERROR_NONE; - } else if (n == 1) { - /* Skip creating a composite error in the case that only one error was - logged */ - grpc_error* e = bctl->errors[0]; - bctl->errors[0] = nullptr; - return e; - } else { - grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Call batch failed", bctl->errors, n); - for (size_t i = 0; i < n; i++) { - GRPC_ERROR_UNREF(bctl->errors[i]); - bctl->errors[i] = nullptr; - } - return error; - } +static void reset_batch_errors(batch_control* bctl) { + GRPC_ERROR_UNREF( + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error))); + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE)); } static void post_batch_completion(batch_control* bctl) { grpc_call* next_child_call; grpc_call* call = bctl->call; - grpc_error* error = consolidate_batch_errors(bctl); + grpc_error* error = GRPC_ERROR_REF( + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error))); if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( @@ -1249,8 +1156,7 @@ static void post_batch_completion(batch_control* bctl) { next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); - cancel_with_error(child, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + cancel_with_error(child, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel"); } child = next_child_call; @@ -1258,24 +1164,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { - get_final_status(call, set_status_value_directly, - call->final_op.client.status, - call->final_op.client.status_details, - call->final_op.client.error_string); - } else { - get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled, nullptr, nullptr); - } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - if (*call->final_op.client.status != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1284,9 +1172,10 @@ static void post_batch_completion(batch_control* bctl) { grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; } + reset_batch_errors(bctl); if (bctl->completion_data.notify_tag.is_closure) { - /* unrefs bctl->error */ + /* unrefs error */ bctl->call = nullptr; /* This closure may be meant to be run within some combiner. Since we aren't * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead @@ -1296,7 +1185,7 @@ static void post_batch_completion(batch_control* bctl) { error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { - /* unrefs bctl->error */ + /* unrefs error */ grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error, finish_batch_completion, bctl, &bctl->completion_data.cq_completion); @@ -1405,8 +1294,12 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) { grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { call->receiving_stream.reset(); - add_batch_error(bctl, GRPC_ERROR_REF(error), true); - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } /* If recv_state is RECV_NONE, we will save the batch_control * object with rel_cas, and will not use it after the cas. Its corresponding @@ -1442,8 +1335,7 @@ static void validate_filtered_metadata(batch_control* bctl) { call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else if ( grpc_compression_algorithm_from_message_stream_compression_algorithm( @@ -1455,8 +1347,7 @@ static void validate_filtered_metadata(batch_control* bctl) { "compression (%d).", call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else { char* error_msg = nullptr; @@ -1466,8 +1357,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, compression_algorithm) == 0) { /* check if algorithm is supported by current channel config */ @@ -1476,8 +1366,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } gpr_free(error_msg); @@ -1495,23 +1384,12 @@ static void validate_filtered_metadata(batch_control* bctl) { } } -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled) { - if (error == GRPC_ERROR_NONE) return; - int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1)); - if (idx == 0 && !has_cancelled) { - cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); - } - bctl->errors[idx] = error; -} - static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1524,6 +1402,13 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { call->send_deadline = md->deadline; } + } else { + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } grpc_closure* saved_rsr_closure = nullptr; @@ -1561,10 +1446,9 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); + recv_trailing_filter(call, md, GRPC_ERROR_REF(error)); finish_batch_step(bctl); } @@ -1572,7 +1456,14 @@ static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); + if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) == + GRPC_ERROR_NONE) { + gpr_atm_rel_store(&bctl->batch_error, + reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error))); + } + if (error != GRPC_ERROR_NONE) { + cancel_with_error(call, GRPC_ERROR_REF(error)); + } finish_batch_step(bctl); } @@ -1774,28 +1665,33 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( call->channel, op->data.send_status_from_server.status); - { - grpc_error* override_error = GRPC_ERROR_NONE; - if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = - error_from_status(op->data.send_status_from_server.status, - "Returned non-ok status"); - } - if (op->data.send_status_from_server.status_details != nullptr) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; + grpc_error* status_error = + op->data.send_status_from_server.status == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server returned error"), + GRPC_ERROR_INT_GRPC_STATUS, + static_cast<intptr_t>( + op->data.send_status_from_server.status)); + if (op->data.send_status_from_server.status_details != nullptr) { + call->send_extra_metadata[1].md = grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_ref_internal( + *op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + if (status_error != GRPC_ERROR_NONE) { char* msg = grpc_slice_to_c_string( GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = - grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE, + status_error = + grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg)); gpr_free(msg); } - set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error); } + + gpr_atm_rel_store(&call->status_error, + reinterpret_cast<gpr_atm>(status_error)); if (!prepare_application_metadata( call, static_cast<int>( diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index b3b06059d4..b34260505a 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success, typedef struct grpc_call_create_args { grpc_channel* channel; + grpc_server* server; grpc_call* parent; uint32_t propagation_mask; diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 82635d3c21..054fe105c3 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -100,7 +100,6 @@ grpc_channel* grpc_channel_create_with_builder( return channel; } - memset(channel, 0, sizeof(*channel)); channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); size_t channel_tracer_max_nodes = 0; // default to off @@ -166,11 +165,12 @@ grpc_channel* grpc_channel_create_with_builder( } grpc_channel_args_destroy(args); - if (channelz_enabled) { - bool is_top_level_channel = channel->is_client && !internal_channel; + // we only need to do the channelz bookkeeping for clients here. The channelz + // bookkeeping for server channels occurs in src/core/lib/surface/server.cc + if (channelz_enabled && channel->is_client) { channel->channelz_channel = channel_node_create_func( - channel, channel_tracer_max_nodes, is_top_level_channel); - channel->channelz_channel->trace()->AddTraceEvent( + channel, channel_tracer_max_nodes, !internal_channel); + channel->channelz_channel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); } @@ -428,6 +428,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) { static void destroy_channel(void* arg, grpc_error* error) { grpc_channel* channel = static_cast<grpc_channel*>(arg); if (channel->channelz_channel != nullptr) { + channel->channelz_channel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel destroyed")); channel->channelz_channel->MarkChannelDestroyed(); channel->channelz_channel.reset(); } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0769d9e4f6..c2cf450e94 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); + } else { + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index cb34def740..5fa58ffdec 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -149,6 +149,9 @@ struct call_data { grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; + grpc_closure recv_trailing_metadata_ready; + grpc_error* error; + grpc_closure* original_recv_trailing_metadata_ready; grpc_closure publish; @@ -219,6 +222,8 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; + + grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -364,6 +369,7 @@ static void server_ref(grpc_server* server) { static void server_delete(grpc_server* server) { registered_method* rm; size_t i; + server->channelz_server.reset(); grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); @@ -730,6 +736,14 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); } +static void server_recv_trailing_metadata_ready(void* user_data, + grpc_error* err) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void server_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast<call_data*>(elem->call_data); @@ -745,6 +759,12 @@ static void server_mutate_op(grpc_call_element* elem, op->payload->recv_initial_metadata.recv_flags = &calld->recv_initial_metadata_flags; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } } static void server_start_transport_stream_op_batch( @@ -779,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport, args.channel = chand->channel; args.server_transport_data = transport_server_data; args.send_deadline = GRPC_MILLIS_INF_FUTURE; + args.server = chand->server; grpc_call* call; grpc_error* error = grpc_call_create(&args, &call); grpc_call_element* elem = @@ -828,7 +849,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); - + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; } @@ -840,7 +863,7 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); GPR_ASSERT(calld->state != PENDING); - + GRPC_ERROR_UNREF(calld->error); if (calld->host_set) { grpc_slice_unref_internal(calld->host); } @@ -941,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server, } grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { + grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server* server = @@ -957,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { server->channel_args = grpc_channel_args_copy(args); + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); + if (grpc_channel_arg_get_bool(arg, false)) { + arg = grpc_channel_args_find(args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + size_t trace_events_per_node = + grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + server->channelz_server = + grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>( + trace_events_per_node); + server->channelz_server->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Server created")); + } + return server; } @@ -1459,3 +1497,11 @@ int grpc_server_has_open_connections(grpc_server* server) { gpr_mu_unlock(&server->mu_global); return r; } + +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server) { + if (server == nullptr) { + return nullptr; + } + return server->channelz_server.get(); +} diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index c617cc223e..0196743ff9 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -23,6 +23,7 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/transport.h" @@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args); +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server); + const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); int grpc_server_has_open_connections(grpc_server* server); diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index b5268add0d..17e8026096 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -24,6 +24,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h" const int kHandshakerClientOpNum = 4; @@ -109,7 +110,7 @@ static grpc_byte_buffer* get_serialized_start_client(alts_tsi_event* event) { if (ok) { buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); gpr_free(target_name); grpc_gcp_handshaker_req_destroy(req); return buffer; @@ -157,7 +158,7 @@ static grpc_byte_buffer* get_serialized_start_server( if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -195,7 +196,7 @@ static grpc_byte_buffer* get_serialized_next(grpc_slice* bytes_received) { if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -258,7 +259,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); client->base.vtable = &vtable; - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); return &client->base; } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc index e0e4184686..d63d3538c5 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc @@ -20,6 +20,8 @@ #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h" +#include "src/core/lib/slice/slice_internal.h" + void add_repeated_field(repeated_field** head, const void* data) { repeated_field* field = static_cast<repeated_field*>(gpr_zalloc(sizeof(*field))); @@ -67,7 +69,7 @@ grpc_slice* create_slice(const char* data, size_t size) { void destroy_slice(grpc_slice* slice) { if (slice != nullptr) { - grpc_slice_unref(*slice); + grpc_slice_unref_internal(*slice); gpr_free(slice); } } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc index ec0bf12b95..cb36d5ebd1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc @@ -24,6 +24,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker, tsi_handshaker_on_next_done_cb cb, void* user_data, @@ -66,8 +68,8 @@ void alts_tsi_event_destroy(alts_tsi_event* event) { grpc_byte_buffer_destroy(event->recv_buffer); grpc_metadata_array_destroy(&event->initial_metadata); grpc_metadata_array_destroy(&event->trailing_metadata); - grpc_slice_unref(event->details); - grpc_slice_unref(event->target_name); + grpc_slice_unref_internal(event->details); + grpc_slice_unref_internal(event->target_name); grpc_alts_credentials_options_destroy(event->options); gpr_free(event); } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 1df1021bb1..34608a3de1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -31,6 +31,7 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/frame_protector/alts_frame_protector.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_client.h" #include "src/core/tsi/alts/handshaker/alts_tsi_utils.h" @@ -182,7 +183,7 @@ static void handshaker_result_destroy(tsi_handshaker_result* self) { gpr_free(result->peer_identity); gpr_free(result->key_data); gpr_free(result->unused_bytes); - grpc_slice_unref(result->rpc_versions); + grpc_slice_unref_internal(result->rpc_versions); gpr_free(result); } @@ -269,12 +270,12 @@ static tsi_result handshaker_next( handshaker->has_sent_start_message = true; } else { if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) { - grpc_slice_unref(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->recv_bytes); } handshaker->recv_bytes = grpc_slice_ref(slice); ok = alts_handshaker_client_next(handshaker->client, event, &slice); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); if (ok != TSI_OK) { gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests"); return ok; @@ -299,8 +300,8 @@ static void handshaker_destroy(tsi_handshaker* self) { alts_tsi_handshaker* handshaker = reinterpret_cast<alts_tsi_handshaker*>(self); alts_handshaker_client_destroy(handshaker->client); - grpc_slice_unref(handshaker->recv_bytes); - grpc_slice_unref(handshaker->target_name); + grpc_slice_unref_internal(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->target_name); grpc_alts_credentials_options_destroy(handshaker->options); gpr_free(handshaker->buffer); gpr_free(handshaker); diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc index d9b5e6c945..1747f1ad04 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc @@ -22,6 +22,8 @@ #include <grpc/byte_buffer_reader.h> +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) { switch (code) { case GRPC_STATUS_OK: @@ -47,7 +49,7 @@ grpc_gcp_handshaker_resp* alts_tsi_utils_deserialize_response( grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr); grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create(); bool ok = grpc_gcp_handshaker_resp_decode(slice, resp); - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); grpc_byte_buffer_reader_destroy(&bbr); if (!ok) { grpc_gcp_handshaker_resp_destroy(resp); diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc index d4fd88d1e2..e7890903d5 100644 --- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc +++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc @@ -61,7 +61,7 @@ static tsi_result alts_grpc_privacy_integrity_protect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to protect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(protected_slice); + grpc_slice_unref_internal(protected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_add(protected_slices, protected_slice); @@ -106,7 +106,7 @@ static tsi_result alts_grpc_privacy_integrity_unprotect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(unprotected_slice); + grpc_slice_unref_internal(unprotected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb); diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc index 2fd408103b..dac23bbf7a 100644 --- a/src/core/tsi/alts_transport_security.cc +++ b/src/core/tsi/alts_transport_security.cc @@ -45,7 +45,9 @@ void grpc_tsi_alts_signal_for_cq_destroy() { } void grpc_tsi_alts_init() { - memset(&g_alts_resource, 0, sizeof(alts_shared_resource)); + g_alts_resource.channel = nullptr; + g_alts_resource.cq = nullptr; + g_alts_resource.is_cq_drained = false; gpr_mu_init(&g_alts_resource.mu); gpr_cv_init(&g_alts_resource.cv); } diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index ce74fde343..f9184bcc34 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -19,6 +19,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" @@ -53,7 +54,7 @@ class SslSessionLRUCache::Node { SetSession(std::move(session)); } - ~Node() { grpc_slice_unref(key_); } + ~Node() { grpc_slice_unref_internal(key_); } // Not copyable nor movable. Node(const Node&) = delete; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 39b891c2e1..ad71286e05 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -42,8 +42,10 @@ #include <grpcpp/support/time.h> #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/completion_queue.h" namespace grpc { @@ -53,7 +55,12 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) g_gli_initializer.summon(); } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + grpc_channel_destroy(c_channel_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } +} namespace { @@ -135,8 +142,8 @@ void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, + ops->cq_tag(), nullptr)); } void* Channel::RegisterMethod(const char* method) { @@ -185,4 +192,39 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, return ok; } +namespace { +class ShutdownCallback : public grpc_core::CQCallbackInterface { + public: + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + void Run(bool) override { + delete cq_; + grpc_core::Delete(this); + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Channel::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-channel CQ registered + std::lock_guard<std::mutex> l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +} + } // namespace grpc diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 67ef46bebe..87902b26f0 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -16,9 +16,11 @@ * */ -#include <grpcpp/generic/generic_stub.h> +#include <functional> +#include <grpcpp/generic/generic_stub.h> #include <grpcpp/impl/rpc_method.h> +#include <grpcpp/support/client_callback.h> namespace grpc { @@ -60,4 +62,14 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall( context, request, false)); } +void GenericStub::experimental_type::UnaryCall( + ClientContext* context, const grpc::string& method, + const ByteBuffer* request, ByteBuffer* response, + std::function<void(Status)> on_completion) { + internal::CallbackUnaryCall( + stub_->channel_.get(), + internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC), + context, request, response, std::move(on_completion)); +} + } // namespace grpc diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 15a373d8a5..5819a4210b 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag { AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); - alarm->Ref(); - grpc_cq_end_op( - alarm->cq_, alarm, error, - [](void* arg, grpc_cq_completion* completion) {}, - arg, &alarm->completion_); - }, - this, grpc_schedule_on_exec_ctx); } ~AlarmImpl() { grpc_core::ExecCtx exec_ctx; @@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->Ref(); + grpc_cq_end_op( + alarm->cq_, alarm, error, + [](void* arg, grpc_cq_completion* completion) {}, + arg, &alarm->completion_); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); + } + void Set(gpr_timespec deadline, std::function<void(bool)> f) { + grpc_core::ExecCtx exec_ctx; + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } @@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; + std::function<void(bool)> callback_; }; } // namespace internal @@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag); } +void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast<internal::AlarmImpl*>(alarm_)->Destroy(); diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc new file mode 100644 index 0000000000..a0c8eeb516 --- /dev/null +++ b/src/cpp/common/callback_common.cc @@ -0,0 +1,149 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <functional> + +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/status.h> + +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/surface/completion_queue.h" + +namespace grpc { +namespace internal { +namespace { + +template <class Func, class Arg> +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { + public: + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, + std::function<void(bool)> f) + : call_(call), parent_(parent), func_(std::move(f)) { + grpc_call_ref(call); + } + + void Run(bool ok) override { + void* ignored = parent_->ops(); + bool new_ok = ok; + GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == parent_->ops()); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + grpc_call_unref(call_); + } + + private: + grpc_call* call_; + CallbackWithSuccessTag* parent_; + std::function<void(bool)> func_; +}; + +class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { + public: + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, + std::function<void(Status)> f) + : call_(call), parent_(parent), func_(std::move(f)), status_() { + grpc_call_ref(call); + } + + void Run(bool ok) override { + void* ignored = parent_->ops(); + + GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); + GPR_ASSERT(ignored == parent_->ops()); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + grpc_call_unref(call_); + } + Status* status_ptr() { return &status_; } + + private: + grpc_call* call_; + CallbackWithStatusTag* parent_; + std::function<void(Status)> func_; + Status status_; +}; + +} // namespace + +CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, + std::function<void(bool)> f, + CompletionQueueTag* ops) + : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) + CallbackWithSuccessImpl(call, this, std::move(f))), + ops_(ops) {} + +void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } + +CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, + std::function<void(Status)> f, + CompletionQueueTag* ops) + : ops_(ops) { + auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) + CallbackWithStatusImpl(call, this, std::move(f)); + impl_ = impl; + status_ = impl->status_ptr(); +} + +void CallbackWithStatusTag::force_run(Status s) { + *status_ = std::move(s); + impl_->Run(true); +} + +} // namespace internal +} // namespace grpc diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc index c7c62eefe5..b5f3d5a13a 100644 --- a/src/cpp/ext/filters/census/server_filter.cc +++ b/src/cpp/ext/filters/census/server_filter.cc @@ -93,7 +93,7 @@ void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, FilterInitialMetadata(initial_metadata, &sml); calld->path_ = grpc_slice_ref_internal(sml.path); calld->method_ = GetMethod(&calld->path_); - calld->qualified_method_ = StrCat("Recv.", calld->method_); + calld->qualified_method_ = absl::StrCat("Recv.", calld->method_); const char* tracing_str = GRPC_SLICE_IS_EMPTY(sml.tracing_slice) ? "" diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index 77c175e5b8..e096c1f421 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -32,6 +32,25 @@ Status ChannelzService::GetTopChannels( ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); + if (json_str == nullptr) { + return Status(INTERNAL, "grpc_channelz_get_top_channels returned null"); + } + google::protobuf::util::Status s = + google::protobuf::util::JsonStringToMessage(json_str, response); + gpr_free(json_str); + if (s != google::protobuf::util::Status::OK) { + return Status(INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServers( + ServerContext* unused, const channelz::v1::GetServersRequest* request, + channelz::v1::GetServersResponse* response) { + char* json_str = grpc_channelz_get_servers(request->start_server_id()); + if (json_str == nullptr) { + return Status(INTERNAL, "grpc_channelz_get_servers returned null"); + } google::protobuf::util::Status s = google::protobuf::util::JsonStringToMessage(json_str, response); gpr_free(json_str); @@ -45,6 +64,25 @@ Status ChannelzService::GetChannel( ServerContext* unused, const channelz::v1::GetChannelRequest* request, channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); + if (json_str == nullptr) { + return Status(NOT_FOUND, "No object found for that ChannelId"); + } + google::protobuf::util::Status s = + google::protobuf::util::JsonStringToMessage(json_str, response); + gpr_free(json_str); + if (s != google::protobuf::util::Status::OK) { + return Status(INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetSubchannel( + ServerContext* unused, const channelz::v1::GetSubchannelRequest* request, + channelz::v1::GetSubchannelResponse* response) { + char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); + if (json_str == nullptr) { + return Status(NOT_FOUND, "No object found for that SubchannelId"); + } google::protobuf::util::Status s = google::protobuf::util::JsonStringToMessage(json_str, response); gpr_free(json_str); diff --git a/src/cpp/server/channelz/channelz_service.h b/src/cpp/server/channelz/channelz_service.h index f619ea49e0..9e0b5b6ead 100644 --- a/src/cpp/server/channelz/channelz_service.h +++ b/src/cpp/server/channelz/channelz_service.h @@ -32,10 +32,18 @@ class ChannelzService final : public channelz::v1::Channelz::Service { Status GetTopChannels( ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, channelz::v1::GetTopChannelsResponse* response) override; + // implementation of GetServers rpc + Status GetServers(ServerContext* unused, + const channelz::v1::GetServersRequest* request, + channelz::v1::GetServersResponse* response) override; // implementation of GetChannel rpc Status GetChannel(ServerContext* unused, const channelz::v1::GetChannelRequest* request, channelz::v1::GetChannelResponse* response) override; + // implementation of GetSubchannel rpc + Status GetSubchannel(ServerContext* unused, + const channelz::v1::GetSubchannelRequest* request, + channelz::v1::GetSubchannelResponse* response) override; }; } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 670da63a4a..bfda67d086 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -30,162 +30,29 @@ #include "src/cpp/server/health/health.pb.h" namespace grpc { - -// -// DefaultHealthCheckService -// - -DefaultHealthCheckService::DefaultHealthCheckService() { - services_map_[""].SetServingStatus(SERVING); -} - -void DefaultHealthCheckService::SetServingStatus( - const grpc::string& service_name, bool serving) { - std::unique_lock<std::mutex> lock(mu_); - services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); -} - -void DefaultHealthCheckService::SetServingStatus(bool serving) { - const ServingStatus status = serving ? SERVING : NOT_SERVING; - std::unique_lock<std::mutex> lock(mu_); - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(status); - } -} - -DefaultHealthCheckService::ServingStatus -DefaultHealthCheckService::GetServingStatus( - const grpc::string& service_name) const { - std::lock_guard<std::mutex> lock(mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) { - return NOT_FOUND; - } - const ServiceData& service_data = it->second; - return service_data.GetServingStatus(); -} - -void DefaultHealthCheckService::RegisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - std::unique_lock<std::mutex> lock(mu_); - ServiceData& service_data = services_map_[service_name]; - service_data.AddCallHandler(handler /* copies ref */); - handler->SendHealth(std::move(handler), service_data.GetServingStatus()); -} - -void DefaultHealthCheckService::UnregisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - std::unique_lock<std::mutex> lock(mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) return; - ServiceData& service_data = it->second; - service_data.RemoveCallHandler(std::move(handler)); - if (service_data.Unused()) { - services_map_.erase(it); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq) { - GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); - return impl_.get(); -} - -// -// DefaultHealthCheckService::ServiceData -// - -void DefaultHealthCheckService::ServiceData::SetServingStatus( - ServingStatus status) { - status_ = status; - for (auto& call_handler : call_handlers_) { - call_handler->SendHealth(call_handler /* copies ref */, status); - } -} - -void DefaultHealthCheckService::ServiceData::AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - call_handlers_.insert(std::move(handler)); -} - -void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - call_handlers_.erase(std::move(handler)); -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl -// - namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; -const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq) - : database_(database), cq_(std::move(cq)) { - // Add Check() method. - check_method_ = new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr); - AddMethod(check_method_); - // Add Watch() method. - watch_method_ = new internal::RpcServiceMethod( - kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr); - AddMethod(watch_method_); - // Create serving thread. - thread_ = std::unique_ptr<::grpc_core::Thread>( - new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); -} - -DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - // We will reach here after the server starts shutting down. - shutdown_ = true; - { - std::unique_lock<std::mutex> lock(cq_shutdown_mu_); - cq_->Shutdown(); - } - thread_->Join(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { - thread_->Start(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { - HealthCheckServiceImpl* service = - reinterpret_cast<HealthCheckServiceImpl*>(arg); - // TODO(juanlishen): This is a workaround to wait for the cq to be ready. - // Need to figure out why cq is not ready after service starts. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(1, GPR_TIMESPAN))); - CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_, - service); - WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_, - service); - void* tag; - bool ok; - while (true) { - if (!service->cq_->Next(&tag, &ok)) { - // The completion queue is shutting down. - GPR_ASSERT(service->shutdown_); - break; - } - auto* next_step = static_cast<CallableTag*>(tag); - next_step->Run(ok); - } -} - -bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( - const ByteBuffer& request, grpc::string* service_name) { + DefaultHealthCheckService* service) + : service_(service), method_(nullptr) { + internal::MethodHandler* handler = + new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, + ByteBuffer>( + std::mem_fn(&HealthCheckServiceImpl::Check), this); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); + AddMethod(method_); +} + +Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( + ServerContext* context, const ByteBuffer* request, ByteBuffer* response) { + // Decode request. std::vector<Slice> slices; - if (!request.Dump(&slices).ok()) return false; + if (!request->Dump(&slices).ok()) { + return Status(StatusCode::INVALID_ARGUMENT, ""); + } uint8_t* request_bytes = nullptr; bool request_bytes_owned = false; size_t request_size = 0; @@ -197,13 +64,14 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( request_size = slices[0].size(); } else { request_bytes_owned = true; - request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); + request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); copy_to += slices[i].size(); } } + if (request_bytes != nullptr) { pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); bool decode_status = pb_decode( @@ -211,22 +79,26 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( if (request_bytes_owned) { gpr_free(request_bytes); } - if (!decode_status) return false; + if (!decode_status) { + return Status(StatusCode::INVALID_ARGUMENT, ""); + } } - *service_name = request_struct.has_service ? request_struct.service : ""; - return true; -} -bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( - ServingStatus status, ByteBuffer* response) { + // Check status from the associated default health checking service. + DefaultHealthCheckService::ServingStatus serving_status = + service_->GetServingStatus( + request_struct.has_service ? request_struct.service : ""); + if (serving_status == DefaultHealthCheckService::NOT_FOUND) { + return Status(StatusCode::NOT_FOUND, ""); + } + + // Encode response grpc_health_v1_HealthCheckResponse response_struct; response_struct.has_status = true; response_struct.status = - status == NOT_FOUND - ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN - : status == SERVING - ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING - : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; + serving_status == DefaultHealthCheckService::SERVING + ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING + : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; pb_ostream_t ostream; memset(&ostream, 0, sizeof(ostream)); pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields, @@ -236,282 +108,48 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( GRPC_SLICE_LENGTH(response_slice)); bool encode_status = pb_encode( &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); - if (!encode_status) return false; + if (!encode_status) { + return Status(StatusCode::INTERNAL, "Failed to encode response."); + } Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); - return true; -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<CheckCallHandler>(cq, database, service); - CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); - { - std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request a Check() call. - handler->next_ = - CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, - &handler->writer_, cq, cq, &handler->next_); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - // The value of ok being false means that the server is shutting down. - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Process request. - gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, - this); - grpc::string service_name; - grpc::Status status = Status::OK; - ByteBuffer response; - if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(INVALID_ARGUMENT, ""); - } else { - ServingStatus serving_status = database_->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - status = Status(StatusCode::NOT_FOUND, "service name unknown"); - } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(INTERNAL, ""); - } - } - // Send response. - { - std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - next_ = - CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - if (status.ok()) { - writer_.Finish(response, status, &next_); - } else { - writer_.FinishWithError(status, &next_); - } - } - } -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", - service_, this); - } -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<WatchCallHandler>(cq, database, service); - WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); - { - std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request AsyncNotifyWhenDone(). - handler->on_done_notified_ = - CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, - std::placeholders::_1, std::placeholders::_2), - self /* copies ref */); - handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); - // Request a Watch() call. - handler->next_ = - CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, - &handler->stream_, cq, cq, - &handler->next_); - } + return Status::OK; } -DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), - database_(database), - service_(service), - stream_(&ctx_), - call_state_(WAITING_FOR_CALL) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - call_state_ = CALL_RECEIVED; - } else { - // AsyncNotifyWhenDone() needs to be called before the call starts, but the - // tag will not pop out if the call never starts ( - // https://github.com/grpc/grpc/issues/10136). So we need to manually - // release the ownership of the handler in this case. - GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - } - if (!ok || shutdown_) { - // The value of ok being false means that the server is shutting down. - Shutdown(std::move(self), "OnCallReceived"); - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Parse request. - if (!service_->DecodeRequest(request_, &service_name_)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(INVALID_ARGUMENT, ""), &on_finish_done_); - call_state_ = FINISH_CALLED; - return; - } - // Register the call for updates to the service. - gpr_log(GPR_DEBUG, - "[HCS %p] Health check watch started for service \"%s\" " - "(handler: %p)", - service_, service_name_.c_str(), this); - database_->RegisterCallHandler(service_name_, std::move(self)); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { - std::unique_lock<std::mutex> lock(mu_); - // If there's already a send in flight, cache the new status, and - // we'll start a new send for it when the one in flight completes. - if (send_in_flight_) { - pending_status_ = status; - return; - } - // Start a send. - SendHealthLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { - std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); - if (service_->shutdown_) { - cq_lock.release()->unlock(); - Shutdown(std::move(self), "SendHealthLocked"); - return; - } - send_in_flight_ = true; - call_state_ = SEND_MESSAGE_PENDING; - // Construct response. - ByteBuffer response; - if (!service_->EncodeResponse(status, &response)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(INTERNAL, ""), &on_finish_done_); - return; - } - next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Write(response, &next_); +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_.emplace("", true); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok || shutdown_) { - Shutdown(std::move(self), "OnSendHealthDone"); - return; - } - call_state_ = CALL_RECEIVED; - { - std::unique_lock<std::mutex> lock(mu_); - send_in_flight_ = false; - // If we got a new status since we started the last send, start a - // new send for it. - if (pending_status_ != NOT_FOUND) { - auto status = pending_status_; - pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); - } - } +void DefaultHealthCheckService::SetServingStatus( + const grpc::string& service_name, bool serving) { + std::lock_guard<std::mutex> lock(mu_); + services_map_[service_name] = serving; } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { - GPR_ASSERT(ok); - done_notified_ = true; - if (ctx_.IsCancelled()) { - is_cancelled_ = true; +void DefaultHealthCheckService::SetServingStatus(bool serving) { + std::lock_guard<std::mutex> lock(mu_); + for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) { + iter->second = serving; } - gpr_log(GPR_DEBUG, - "[HCS %p] Healt check call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast<int>(is_cancelled_)); - Shutdown(std::move(self), "OnDoneNotified"); } -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - Shutdown(std::shared_ptr<CallHandler> self, const char* reason) { - if (!shutdown_) { - gpr_log(GPR_DEBUG, - "[HCS %p] Shutting down the handler (service_name: \"%s\", " - "handler: %p, reason: %s).", - service_, service_name_.c_str(), this, reason); - shutdown_ = true; - } - // OnCallReceived() may be called after OnDoneNotified(), so we need to - // try to Finish() every time we are in Shutdown(). - if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { - std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - // TODO(juanlishen): Maybe add a message proto for the client to - // explicitly cancel the stream so that we can return OK status in such - // cases. - stream_.Finish(Status::CANCELLED, &on_finish_done_); - call_state_ = FINISH_CALLED; - } +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( + const grpc::string& service_name) const { + std::lock_guard<std::mutex> lock(mu_); + const auto& iter = services_map_.find(service_name); + if (iter == services_map_.end()) { + return NOT_FOUND; } + return iter->second ? SERVING : NOT_SERVING; } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, - "[HCS %p] Health check call finished (service_name: \"%s\", " - "handler: %p).", - service_, service_name_.c_str(), this); - } +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService() { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this)); + return impl_.get(); } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index edad594936..a1ce5aa64e 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,268 +19,42 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include <atomic> #include <mutex> -#include <set> -#include <grpc/support/log.h> -#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> -#include <grpcpp/impl/codegen/async_generic_service.h> -#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> -#include "src/core/lib/gprpp/thd.h" - namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; - // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { - public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) = 0; - }; + explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq); - - ~HealthCheckServiceImpl(); - - void StartServingThread(); + Status Check(ServerContext* context, const ByteBuffer* request, + ByteBuffer* response); private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function<void(std::shared_ptr<CallHandler>, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr<CallHandler> ReleaseHandler() { - return std::move(handler_); - } - - private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr<CallHandler> handler_; - }; - - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; - }; - - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Requires holding mu_. - void SendHealthLocked(std::shared_ptr<CallHandler> self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); - - void Shutdown(std::shared_ptr<CallHandler> self, const char* reason); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - grpc::string service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - - std::mutex mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - // The state of the RPC progress. - enum CallState { - WAITING_FOR_CALL, - CALL_RECEIVED, - SEND_MESSAGE_PENDING, - FINISH_CALLED - } call_state_; - - bool shutdown_ = false; - bool done_notified_ = false; - bool is_cancelled_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; - - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); - - // Returns true on success. - static bool DecodeRequest(const ByteBuffer& request, - grpc::string* service_name); - static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - - DefaultHealthCheckService* database_; - std::unique_ptr<ServerCompletionQueue> cq_; - internal::RpcServiceMethod* check_method_; - internal::RpcServiceMethod* watch_method_; - - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. - std::mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr<::grpc_core::Thread> thread_; + const DefaultHealthCheckService* const service_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); - void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; - + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; ServingStatus GetServingStatus(const grpc::string& service_name) const; - - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq); + HealthCheckServiceImpl* GetHealthCheckService(); private: - // Stores the current serving status of a service and any call - // handlers registered for updates when the service's status changes. - class ServiceData { - public: - void SetServingStatus(ServingStatus status); - ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - void RemoveCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } - - private: - ServingStatus status_ = NOT_FOUND; - std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> - call_handlers_; - }; - - void RegisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - - void UnregisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - mutable std::mutex mu_; - std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. + std::map<grpc::string, bool> services_map_; std::unique_ptr<HealthCheckServiceImpl> impl_; }; diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c index 5c214c7160..09bd98a3d9 100644 --- a/src/cpp/server/health/health.pb.c +++ b/src/cpp/server/health/health.pb.c @@ -2,6 +2,7 @@ /* Generated by nanopb-0.3.7-dev */ #include "src/cpp/server/health/health.pb.h" + /* @@protoc_insertion_point(includes) */ #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h index 9d54ccd618..29e1f3bacb 100644 --- a/src/cpp/server/health/health.pb.h +++ b/src/cpp/server/health/health.pb.h @@ -17,12 +17,11 @@ extern "C" { typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus { grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0, grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1, - grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2, - grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3 + grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2 } grpc_health_v1_HealthCheckResponse_ServingStatus; #define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1)) +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1)) /* Struct definitions */ typedef struct _grpc_health_v1_HealthCheckRequest { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3cadf65c80..36f7eb81f9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -559,20 +559,16 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. - ServerCompletionQueue* health_check_cq = nullptr; - DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue>(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { + gpr_log(GPR_INFO, + "Default health check service disabled at async-only server."); + } else { + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + RegisterService(nullptr, default_hc_service->GetHealthCheckService()); + } } grpc_server_start(server_); @@ -587,9 +583,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } } // If this server has any support for synchronous methods (has any sync @@ -602,10 +595,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } - - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } } void Server::ShutdownInternal(gpr_timespec deadline) { @@ -668,6 +657,7 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); + // TODO(vjpai): Use ops->cq_tag once this case supports callbacks auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); if (result != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); @@ -697,9 +687,6 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (*status) { - context_->client_metadata_.FillMap(); - } context_->set_call(call_); context_->cq_ = call_cq_; internal::Call call(call_, server_, call_cq_, diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 6f5bde0d9f..b7254b6bb9 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -61,6 +61,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } + /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API + void* cq_tag() override { return this; } + void Unref(); private: @@ -134,7 +137,6 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) compression_level_set_(false), has_pending_ops_(false) { std::swap(*client_metadata_.arr(), *arr); - client_metadata_.FillMap(); } ServerContext::~ServerContext() { diff --git a/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs b/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs index a43040f01a..0834ddadda 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs @@ -57,19 +57,23 @@ namespace Grpc.Core.Tests [Test] public async Task Channel_WaitForStateChangedAsync() { - helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => - { - return Task.FromResult(request); - }); - Assert.ThrowsAsync(typeof(TaskCanceledException), - async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10))); + async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(0))); var stateChangedTask = channel.WaitForStateChangedAsync(channel.State); + await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(5000)); + await stateChangedTask; + Assert.AreEqual(ChannelState.Ready, channel.State); + } - await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"); + [Test] + public async Task Channel_TryWaitForStateChangedAsync() + { + Assert.IsFalse(await channel.TryWaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(0))); - await stateChangedTask; + var stateChangedTask = channel.TryWaitForStateChangedAsync(channel.State); + await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(5000)); + Assert.IsTrue(await stateChangedTask); Assert.AreEqual(ChannelState.Ready, channel.State); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 9aab54d2d0..775849d89b 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -107,6 +107,42 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources() + { + string nullRequest = null; // will throw when serializing + Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest)); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] + public void AsyncUnary_StartCallFailureDoesntLeakResources() + { + fakeCall.MakeStartCallFail(); + Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1")); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] + public void SyncUnary_RequestSerializationExceptionDoesntLeakResources() + { + string nullRequest = null; // will throw when serializing + Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest)); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] + public void SyncUnary_StartCallFailureDoesntLeakResources() + { + fakeCall.MakeStartCallFail(); + Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1")); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] public void ClientStreaming_StreamingReadNotAllowed() { asyncCall.ClientStreamingCallAsync(); @@ -328,6 +364,15 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void ClientStreaming_StartCallFailureDoesntLeakResources() + { + fakeCall.MakeStartCallFail(); + Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync()); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] public void ServerStreaming_StreamingSendNotAllowed() { asyncCall.StartServerStreamingCall("request1"); @@ -402,6 +447,27 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources() + { + string nullRequest = null; // will throw when serializing + Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest)); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + + var responseStream = new ClientResponseStream<string, string>(asyncCall); + var readTask = responseStream.MoveNext(); + } + + [Test] + public void ServerStreaming_StartCallFailureDoesntLeakResources() + { + fakeCall.MakeStartCallFail(); + Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1")); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + + [Test] public void DuplexStreaming_NoRequestNoResponse_Success() { asyncCall.StartDuplexStreamingCall(); @@ -558,6 +624,15 @@ namespace Grpc.Core.Internal.Tests AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); } + [Test] + public void DuplexStreaming_StartCallFailureDoesntLeakResources() + { + fakeCall.MakeStartCallFail(); + Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall()); + Assert.AreEqual(0, channel.GetCallReferenceCount()); + Assert.IsTrue(fakeCall.IsDisposed); + } + ClientSideStatus CreateClientSideStatus(StatusCode statusCode) { return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs index 581ac3384b..ef67918dab 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -31,6 +31,7 @@ namespace Grpc.Core.Internal.Tests /// </summary> internal class FakeNativeCall : INativeCall { + private bool shouldStartCallFail; public IUnaryResponseClientCallback UnaryResponseClientCallback { get; @@ -102,26 +103,31 @@ namespace Grpc.Core.Internal.Tests public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { + StartCallMaybeFail(); UnaryResponseClientCallback = callback; } public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { + StartCallMaybeFail(); throw new NotImplementedException(); } public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { + StartCallMaybeFail(); UnaryResponseClientCallback = callback; } public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { + StartCallMaybeFail(); ReceivedStatusOnClientCallback = callback; } public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { + StartCallMaybeFail(); ReceivedStatusOnClientCallback = callback; } @@ -165,5 +171,22 @@ namespace Grpc.Core.Internal.Tests { IsDisposed = true; } + + /// <summary> + /// Emulate CallSafeHandle.CheckOk() failure for all future attempts + /// to start a call. + /// </summary> + public void MakeStartCallFail() + { + shouldStartCallFail = true; + } + + private void StartCallMaybeFail() + { + if (shouldStartCallFail) + { + throw new InvalidOperationException("Start call has failed."); + } + } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 7912b06b71..7ce929dfa3 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -136,7 +136,7 @@ namespace Grpc.Core /// </summary> public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) { - var result = await WaitForStateChangedInternalAsync(lastObservedState, deadline).ConfigureAwait(false); + var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false); if (!result) { throw new TaskCanceledException("Reached deadline."); @@ -147,7 +147,7 @@ namespace Grpc.Core /// Returned tasks completes once channel state has become different from /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned). /// </summary> - internal Task<bool> WaitForStateChangedInternalAsync(ChannelState lastObservedState, DateTime? deadline = null) + public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) { GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown, "Shutdown is a terminal state. No further state changes can occur."); @@ -297,6 +297,12 @@ namespace Grpc.Core activeCallCounter.Decrement(); } + // for testing only + internal long GetCallReferenceCount() + { + return activeCallCounter.Count; + } + private ChannelState GetConnectivityState(bool tryToConnect) { try diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 66902f3caa..4cdf0ee6a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -17,6 +17,7 @@ #endregion using System; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Logging; using Grpc.Core.Profiling; @@ -34,6 +35,8 @@ namespace Grpc.Core.Internal readonly CallInvocationDetails<TRequest, TResponse> details; readonly INativeCall injectedNativeCall; // for testing + bool registeredWithChannel; + // Dispose of to de-register cancellation token registration IDisposable cancellationTokenRegistration; @@ -77,43 +80,59 @@ namespace Grpc.Core.Internal using (profiler.NewScope("AsyncCall.UnaryCall")) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync()) { - byte[] payload = UnsafeSerialize(msg); + bool callStartedOk = false; + try + { + unaryResponseTcs = new TaskCompletionSource<TResponse>(); - unaryResponseTcs = new TaskCompletionSource<TResponse>(); + lock (myLock) + { + GrpcPreconditions.CheckState(!started); + started = true; + Initialize(cq); - lock (myLock) - { - GrpcPreconditions.CheckState(!started); - started = true; - Initialize(cq); + halfcloseRequested = true; + readingDone = true; + } - halfcloseRequested = true; - readingDone = true; - } + byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - { - var ctx = details.Channel.Environment.BatchContextPool.Lease(); - try + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - var ev = cq.Pluck(ctx.Handle); - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + Logger.Error(e, "Exception occurred while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occurred while invoking completion delegate."); + ctx.Recycle(); } } - finally + } + finally + { + if (!callStartedOk) { - ctx.Recycle(); + lock (myLock) + { + OnFailedToStartCallLocked(); + } } } @@ -130,22 +149,35 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; - readingDone = true; + halfcloseRequested = true; + readingDone = true; + + byte[] payload = UnsafeSerialize(msg); - byte[] payload = UnsafeSerialize(msg); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + return unaryResponseTcs.Task; + } + finally { - call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - return unaryResponseTcs.Task; } } @@ -157,20 +189,32 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - readingDone = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + return unaryResponseTcs.Task; + } + finally { - call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - - return unaryResponseTcs.Task; } } @@ -181,21 +225,33 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; + halfcloseRequested = true; - byte[] payload = UnsafeSerialize(msg); + byte[] payload = UnsafeSerialize(msg); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -207,17 +263,29 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -327,7 +395,11 @@ namespace Grpc.Core.Internal protected override void OnAfterReleaseResourcesLocked() { - details.Channel.RemoveCallReference(this); + if (registeredWithChannel) + { + details.Channel.RemoveCallReference(this); + registeredWithChannel = false; + } } protected override void OnAfterReleaseResourcesUnlocked() @@ -394,10 +466,27 @@ namespace Grpc.Core.Internal var call = CreateNativeCall(cq); details.Channel.AddCallReference(this); + registeredWithChannel = true; InitializeInternal(call); + RegisterCancellationCallback(); } + private void OnFailedToStartCallLocked() + { + ReleaseResources(); + + // We need to execute the hook that disposes the cancellation token + // registration, but it cannot be done from under a lock. + // To make things simple, we just schedule the unregistering + // on a threadpool. + // - Once the native call is disposed, the Cancel() calls are ignored anyway + // - We don't care about the overhead as OnFailedToStartCallLocked() only happens + // when something goes very bad when initializing a call and that should + // never happen when gRPC is used correctly. + ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked()); + } + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) { if (injectedNativeCall != null) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 5a53049e4b..a93dc34620 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -189,7 +189,7 @@ namespace Grpc.Core.Internal /// </summary> protected abstract Exception GetRpcExceptionClientOnly(); - private void ReleaseResources() + protected void ReleaseResources() { if (call != null) { diff --git a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs index 4d695e8850..faeb51e6f7 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg); - Logger.Error(e, GetMetadataExceptionLogMsg); + // eat the exception, we must not throw when inside callback from native code. + Logger.Error(e, "Exception occurred while invoking native metadata interceptor handler."); } } @@ -87,7 +87,8 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg); + string detail = GetMetadataExceptionStatusMsg + " " + e.ToString(); + Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, detail); Logger.Error(e, GetMetadataExceptionLogMsg); } } diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs index 94429d74ce..ff89897565 100644 --- a/src/csharp/Grpc.Core/RpcException.cs +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -33,10 +33,8 @@ namespace Grpc.Core /// Creates a new <c>RpcException</c> associated with given status. /// </summary> /// <param name="status">Resulting status of a call.</param> - public RpcException(Status status) : base(status.ToString()) + public RpcException(Status status) : this(status, Metadata.Empty, status.ToString()) { - this.status = status; - this.trailers = Metadata.Empty; } /// <summary> @@ -44,10 +42,8 @@ namespace Grpc.Core /// </summary> /// <param name="status">Resulting status of a call.</param> /// <param name="message">The exception message.</param> - public RpcException(Status status, string message) : base(message) + public RpcException(Status status, string message) : this(status, Metadata.Empty, message) { - this.status = status; - this.trailers = Metadata.Empty; } /// <summary> @@ -55,7 +51,17 @@ namespace Grpc.Core /// </summary> /// <param name="status">Resulting status of a call.</param> /// <param name="trailers">Response trailing metadata.</param> - public RpcException(Status status, Metadata trailers) : base(status.ToString()) + public RpcException(Status status, Metadata trailers) : this(status, trailers, status.ToString()) + { + } + + /// <summary> + /// Creates a new <c>RpcException</c> associated with given status, message and trailing response metadata. + /// </summary> + /// <param name="status">Resulting status of a call.</param> + /// <param name="trailers">Response trailing metadata.</param> + /// <param name="message">The exception message.</param> + public RpcException(Status status, Metadata trailers, string message) : base(message) { this.status = status; this.trailers = GrpcPreconditions.CheckNotNull(trailers); diff --git a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs index c83ccd2612..40447854f4 100644 --- a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs @@ -153,9 +153,10 @@ namespace Grpc.IntegrationTesting [Test] public void MetadataCredentials_InterceptorThrows() { + var authInterceptorExceptionMessage = "Auth interceptor throws"; var callCredentials = CallCredentials.FromInterceptor(new AsyncAuthInterceptor((context, metadata) => { - throw new Exception("Auth interceptor throws"); + throw new Exception(authInterceptorExceptionMessage); })); var channelCredentials = ChannelCredentials.Create(TestCredentials.CreateSslCredentials(), callCredentials); channel = new Channel(Host, server.Ports.Single().BoundPort, channelCredentials, options); @@ -163,6 +164,7 @@ namespace Grpc.IntegrationTesting var ex = Assert.Throws<RpcException>(() => client.UnaryCall(new SimpleRequest { })); Assert.AreEqual(StatusCode.Unavailable, ex.Status.StatusCode); + StringAssert.Contains(authInterceptorExceptionMessage, ex.Status.Detail); } private class FakeTestService : TestService.TestServiceBase diff --git a/src/proto/grpc/health/v1/health.proto b/src/proto/grpc/health/v1/health.proto index 38843ff1e7..4b4677b8a4 100644 --- a/src/proto/grpc/health/v1/health.proto +++ b/src/proto/grpc/health/v1/health.proto @@ -34,30 +34,10 @@ message HealthCheckResponse { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; - SERVICE_UNKNOWN = 3; // Used only by the Watch method. } ServingStatus status = 1; } service Health { - // If the requested service is unknown, the call will fail with status - // NOT_FOUND. rpc Check(HealthCheckRequest) returns (HealthCheckResponse); - - // Performs a watch for the serving status of the requested service. - // The server will immediately send back a message indicating the current - // serving status. It will then subsequently send a new message whenever - // the service's serving status changes. - // - // If the requested service is unknown when the call is received, the - // server will send a message setting the serving status to - // SERVICE_UNKNOWN but will *not* terminate the call. If at some - // future point, the serving status of the service becomes known, the - // server will send a new message with the service's serving status. - // - // If the call terminates with status UNIMPLEMENTED, then clients - // should assume this method is not supported and should not retry the - // call. If the call terminates with any other status (including OK), - // clients should retry the call with appropriate exponential backoff. - rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); } diff --git a/src/proto/grpc/testing/package_options.proto b/src/proto/grpc/testing/package_options.proto new file mode 100644 index 0000000000..e7ecf8c196 --- /dev/null +++ b/src/proto/grpc/testing/package_options.proto @@ -0,0 +1,28 @@ +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +// For sanity checking package definitions +option ruby_package = "Grpc.Testing.Package.Options"; + +message TestRequest { } + +message TestResponse { } + +service TestService { + rpc GetTest(TestRequest) returns (TestResponse) { } +} diff --git a/src/python/grpcio/grpc/BUILD.bazel b/src/python/grpcio/grpc/BUILD.bazel index 3f214bf3b0..2e6839ef2d 100644 --- a/src/python/grpcio/grpc/BUILD.bazel +++ b/src/python/grpcio/grpc/BUILD.bazel @@ -2,7 +2,7 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") package(default_visibility = ["//visibility:public"]) -py_binary( +py_library( name = "grpcio", srcs = ["__init__.py"], deps = [ @@ -22,7 +22,6 @@ py_binary( data = [ "//:grpc", ], - main = "__init__.py", imports = ["../",], ) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 6876601785..3494c9b15a 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -24,6 +24,7 @@ from grpc import _grpcio_metadata from grpc._cython import cygrpc from grpc.framework.foundation import callable_util +logging.basicConfig() _LOGGER = logging.getLogger(__name__) _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 8358cbec5b..3805c7e82a 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -20,6 +20,7 @@ import six import grpc from grpc._cython import cygrpc +logging.basicConfig() _LOGGER = logging.getLogger(__name__) CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 7124e83dee..cfd3a51d9b 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -8,6 +8,7 @@ pyx_library( "__init__.py", "cygrpc.pxd", "cygrpc.pyx", + "_cygrpc/_hooks.pyx.pxi", "_cygrpc/grpc_string.pyx.pxi", "_cygrpc/arguments.pyx.pxi", "_cygrpc/call.pyx.pxi", @@ -15,6 +16,7 @@ pyx_library( "_cygrpc/credentials.pyx.pxi", "_cygrpc/completion_queue.pyx.pxi", "_cygrpc/event.pyx.pxi", + "_cygrpc/fork_posix.pyx.pxi", "_cygrpc/metadata.pyx.pxi", "_cygrpc/operation.pyx.pxi", "_cygrpc/records.pyx.pxi", @@ -24,12 +26,14 @@ pyx_library( "_cygrpc/time.pyx.pxi", "_cygrpc/grpc_gevent.pyx.pxi", "_cygrpc/grpc.pxi", + "_cygrpc/_hooks.pxd.pxi", "_cygrpc/arguments.pxd.pxi", "_cygrpc/call.pxd.pxi", "_cygrpc/channel.pxd.pxi", "_cygrpc/credentials.pxd.pxi", "_cygrpc/completion_queue.pxd.pxi", "_cygrpc/event.pxd.pxi", + "_cygrpc/fork_posix.pxd.pxi", "_cygrpc/metadata.pxd.pxi", "_cygrpc/operation.pxd.pxi", "_cygrpc/records.pxd.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 38fd9e78b2..63048e8da0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -144,8 +144,14 @@ cdef class SSLChannelCredentials(ChannelCredentials): return grpc_ssl_credentials_create( c_pem_root_certificates, NULL, NULL, NULL) else: - c_pem_key_certificate_pair.private_key = self._private_key - c_pem_key_certificate_pair.certificate_chain = self._certificate_chain + if self._private_key: + c_pem_key_certificate_pair.private_key = self._private_key + else: + c_pem_key_certificate_pair.private_key = NULL + if self._certificate_chain: + c_pem_key_certificate_pair.certificate_chain = self._certificate_chain + else: + c_pem_key_certificate_pair.certificate_chain = NULL return grpc_ssl_credentials_create( c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi index 00a1b23a67..334e561baa 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -14,6 +14,7 @@ import logging +logging.basicConfig() _LOGGER = logging.getLogger(__name__) # This function will ascii encode unicode string inputs if neccesary. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd..5779437b92 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -18,6 +18,7 @@ import logging import time import grpc +logging.basicConfig() _LOGGER = logging.getLogger(__name__) cdef class Server: diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index 916ee080b6..88ab4d8371 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -20,6 +20,7 @@ import grpc from grpc import _common from grpc._cython import cygrpc +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 7276a7fd90..daa000a6e1 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -27,6 +27,7 @@ from grpc import _interceptor from grpc._cython import cygrpc from grpc.framework.foundation import callable_util +logging.basicConfig() _LOGGER = logging.getLogger(__name__) _SHUTDOWN_TAG = 'shutdown' diff --git a/src/python/grpcio/grpc/framework/foundation/callable_util.py b/src/python/grpcio/grpc/framework/foundation/callable_util.py index 24daf3406f..fb8d5f7c1e 100644 --- a/src/python/grpcio/grpc/framework/foundation/callable_util.py +++ b/src/python/grpcio/grpc/framework/foundation/callable_util.py @@ -21,6 +21,7 @@ import logging import six +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 216e3990db..7702d1785f 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -17,6 +17,7 @@ import logging from concurrent import futures +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio/grpc/framework/foundation/stream_util.py b/src/python/grpcio/grpc/framework/foundation/stream_util.py index 1faaf29bd7..9184f95873 100644 --- a/src/python/grpcio/grpc/framework/foundation/stream_util.py +++ b/src/python/grpcio/grpc/framework/foundation/stream_util.py @@ -19,6 +19,7 @@ import threading from grpc.framework.foundation import stream _NO_VALUE = object() +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py index 191b1c1726..d7205ca579 100644 --- a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py +++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py @@ -18,6 +18,7 @@ import threading import grpc _NOT_YET_OBSERVED = object() +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py index b856da100f..736b714dc6 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py @@ -18,6 +18,7 @@ import threading import grpc from grpc_testing import _common +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio_testing/grpc_testing/_time.py b/src/python/grpcio_testing/grpc_testing/_time.py index 75e6db3458..9692c34e6f 100644 --- a/src/python/grpcio_testing/grpc_testing/_time.py +++ b/src/python/grpcio_testing/grpc_testing/_time.py @@ -21,6 +21,7 @@ import time as _time import grpc import grpc_testing +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index fd28d498a1..768cdaf468 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -25,6 +25,7 @@ from tests.interop import methods from tests.interop import resources from tests.unit import test_common +logging.basicConfig() _ONE_DAY_IN_SECONDS = 60 * 60 * 24 _LOGGER = logging.getLogger(__name__) diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b6c0791469..fc641dae80 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -819,6 +819,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { unsigned write_flag = 0; void* tag = (void*)&st; + grpc_ruby_fork_guard(); if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call"); return Qnil; diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 3f0dc530cf..6d4b2293a2 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -217,6 +217,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) { MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); + grpc_ruby_fork_guard(); rb_thread_call_without_gvl( wait_until_channel_polling_thread_started_no_gil, &stop_waiting_for_thread_start, @@ -374,6 +375,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, watch_state_stack stack; void* op_success = 0; + grpc_ruby_fork_guard(); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); if (wrapper->bg_wrapped == NULL) { @@ -415,6 +417,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, grpc_slice* host_slice_ptr = NULL; char* tmp_str = NULL; + grpc_ruby_fork_guard(); if (host != Qnil) { host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index f065a857db..872aed0cfc 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -23,9 +23,13 @@ #include <math.h> #include <ruby/vm.h> +#include <stdbool.h> #include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> #include <grpc/grpc.h> +#include <grpc/support/log.h> #include <grpc/support/time.h> #include "rb_call.h" #include "rb_call_credentials.h" @@ -255,7 +259,26 @@ static void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } -static void grpc_rb_shutdown(void) { grpc_shutdown(); } +#if GPR_WINDOWS +static void grpc_ruby_set_init_pid(void) {} +static bool grpc_ruby_forked_after_init(void) { return false; } +#else +static pid_t grpc_init_pid; + +static void grpc_ruby_set_init_pid(void) { + GPR_ASSERT(grpc_init_pid == 0); + grpc_init_pid = getpid(); +} + +static bool grpc_ruby_forked_after_init(void) { + GPR_ASSERT(grpc_init_pid != 0); + return grpc_init_pid != getpid(); +} +#endif + +static void grpc_rb_shutdown(void) { + if (!grpc_ruby_forked_after_init()) grpc_shutdown(); +} /* Initialize the GRPC module structs */ @@ -276,10 +299,17 @@ VALUE sym_metadata = Qundef; static gpr_once g_once_init = GPR_ONCE_INIT; static void grpc_ruby_once_init_internal() { + grpc_ruby_set_init_pid(); grpc_init(); atexit(grpc_rb_shutdown); } +void grpc_ruby_fork_guard() { + if (grpc_ruby_forked_after_init()) { + rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking"); + } +} + static VALUE bg_thread_init_rb_mu = Qundef; static int bg_thread_init_done = 0; diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 577902319e..4118435ecf 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -69,4 +69,6 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval); void grpc_ruby_once_init(); +void grpc_ruby_fork_guard(); + #endif /* GRPC_RB_H_ */ diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index f44f89dfa0..0c46f6c85a 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -97,7 +97,9 @@ grpc_resource_quota_resize_type grpc_resource_quota_resize_import; grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_import; grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; +grpc_channelz_get_servers_type grpc_channelz_get_servers_import; grpc_channelz_get_channel_type grpc_channelz_get_channel_import; +grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; grpc_use_signal_type grpc_use_signal_import; @@ -351,7 +353,9 @@ void grpc_rb_load_imports(HMODULE library) { grpc_resource_quota_set_max_threads_import = (grpc_resource_quota_set_max_threads_type) GetProcAddress(library, "grpc_resource_quota_set_max_threads"); grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable"); grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels"); + grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers"); grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel"); + grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 99433f0e40..d00e75c326 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -266,9 +266,15 @@ extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id); extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; #define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import +typedef char*(*grpc_channelz_get_servers_type)(intptr_t start_server_id); +extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import; +#define grpc_channelz_get_servers grpc_channelz_get_servers_import typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id); extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import; #define grpc_channelz_get_channel grpc_channelz_get_channel_import +typedef char*(*grpc_channelz_get_subchannel_type)(intptr_t subchannel_id); +extern grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; +#define grpc_channelz_get_subchannel grpc_channelz_get_subchannel_import typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args); extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 88e6a0cfd5..2931f34409 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -243,6 +243,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) { static VALUE grpc_rb_server_start(VALUE self) { grpc_rb_server* s = NULL; TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); + + grpc_ruby_fork_guard(); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "destroyed!"); } else { diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 5fd1805aab..efb0e4233d 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -32,7 +32,7 @@ module GRPC # @return [Proc] { |instance| marshalled(instance) } def marshal_proc - proc { |o| o.class.method(marshal_method).call(o).to_s } + proc { |o| o.class.send(marshal_method, o).to_s } end # @param [:input, :output] target determines whether to produce the an @@ -42,9 +42,9 @@ module GRPC # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } def unmarshal_proc(target) fail ArgumentError unless [:input, :output].include?(target) - unmarshal_class = method(target).call + unmarshal_class = send(target) unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream - proc { |o| unmarshal_class.method(unmarshal_method).call(o) } + proc { |o| unmarshal_class.send(unmarshal_method, o) } end def handle_request_response(active_call, mth, inter_ctx) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 838ac45927..3b5a0ce27f 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -136,7 +136,7 @@ module GRPC begin blk, args = worker_queue.pop blk.call(*args) - rescue StandardError => e + rescue StandardError, GRPC::Core::CallError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 3c9eca47ec..adba6db99c 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -13,6 +13,7 @@ # limitations under the License. require 'spec_helper' +require 'English' def load_test_certs test_root = File.join(File.dirname(__FILE__), 'testdata') @@ -27,6 +28,28 @@ describe GRPC::Core::Channel do GRPC::Core::ChannelCredentials.new(load_test_certs[0]) end + def fork_with_propagated_error_message + pipe_read, pipe_write = IO.pipe + pid = fork do + pipe_read.close + begin + yield + rescue => exc + pipe_write.syswrite(exc.message) + end + pipe_write.close + end + pipe_write.close + + exc_message = pipe_read.read + Process.wait(pid) + + unless $CHILD_STATUS.success? + raise "forked process failed with #{$CHILD_STATUS}" + end + raise exc_message unless exc_message.empty? + end + shared_examples '#new' do it 'take a host name without channel args' do blk = proc do @@ -79,6 +102,14 @@ describe GRPC::Core::Channel do blk = construct_with_args(args) expect(&blk).to_not raise_error end + + it 'raises if grpc was initialized in another process' do + blk = construct_with_args({}) + expect(&blk).not_to raise_error + expect do + fork_with_propagated_error_message(&blk) + end.to raise_error(RuntimeError, 'grpc cannot be used before and after forking') + end end describe '#new for secure channels' do @@ -121,6 +152,19 @@ describe GRPC::Core::Channel do end expect(&blk).to raise_error(RuntimeError) end + + it 'raises if grpc was initialized in another process' do + ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure) + + deadline = Time.now + 5 + + blk = proc do + fork_with_propagated_error_message do + ch.create_call(nil, nil, 'dummy_method', nil, deadline) + end + end + expect(&blk).to raise_error(RuntimeError, 'grpc cannot be used before and after forking') + end end describe '#destroy' do diff --git a/src/ruby/spec/pb/codegen/package_option_spec.rb b/src/ruby/spec/pb/codegen/package_option_spec.rb new file mode 100644 index 0000000000..46d23cd651 --- /dev/null +++ b/src/ruby/spec/pb/codegen/package_option_spec.rb @@ -0,0 +1,53 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'spec_helper' +require 'open3' +require 'tmpdir' + +describe 'Code Generation Options' do + it 'should generate and respect package options' do + fail 'CONFIG env variable unexpectedly unset' unless ENV['CONFIG'] + bins_sub_dir = ENV['CONFIG'] + + src_dir = File.join(File.dirname(__FILE__), '..', '..', '..', '..') + pb_dir = File.join(src_dir, 'proto') + bins_dir = File.join(src_dir, '..', 'bins', bins_sub_dir) + + plugin = File.join(bins_dir, 'grpc_ruby_plugin') + protoc = File.join(bins_dir, 'protobuf', 'protoc') + + # Generate the service from the proto + Dir.mktmpdir(nil, File.dirname(__FILE__)) do |tmp_dir| + gen_file = system(protoc, + '-I.', + 'grpc/testing/package_options.proto', + "--grpc_out=#{tmp_dir}", # generate the service + "--ruby_out=#{tmp_dir}", # generate the definitions + "--plugin=protoc-gen-grpc=#{plugin}", + chdir: pb_dir, + out: File::NULL) + + expect(gen_file).to be_truthy + begin + $LOAD_PATH.push(tmp_dir) + expect { Grpc::Testing::Package::Options::TestService::Service }.to raise_error(NameError) + expect(require('grpc/testing/package_options_services_pb')).to be_truthy + expect { Grpc::Testing::Package::Options::TestService::Service }.to_not raise_error + ensure + $LOAD_PATH.delete(tmp_dir) + end + end + end +end |