aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-09-13 15:16:02 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-09-13 15:16:02 -0700
commit1819935efbb2fb014c0c29bc47f5045fbcf5c6cf (patch)
treeb59127474ac1b007a44d4209caadc27e7355a52a /src/core
parent30e7b02b5c8bc55c109ed84dfa30663ce90e134b (diff)
parent07eecd8421314d62e8df33b14a33203864f91bee (diff)
Merge branch 'master' into statuscaution
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc104
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc60
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.h56
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h26
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc18
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h3
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc99
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h12
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc5
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc70
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc38
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h11
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.cc7
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h5
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc17
-rw-r--r--src/core/lib/channel/channel_trace.cc73
-rw-r--r--src/core/lib/channel/channel_trace.h28
-rw-r--r--src/core/lib/channel/channelz.cc151
-rw-r--r--src/core/lib/channel/channelz.h166
-rw-r--r--src/core/lib/channel/channelz_registry.cc100
-rw-r--r--src/core/lib/channel/channelz_registry.h57
-rw-r--r--src/core/lib/iomgr/port.h5
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc90
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h7
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc3
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc3
-rw-r--r--src/core/lib/surface/call.cc67
-rw-r--r--src/core/lib/surface/call.h1
-rw-r--r--src/core/lib/surface/channel.cc12
-rw-r--r--src/core/lib/surface/completion_queue.cc4
-rw-r--r--src/core/lib/surface/server.cc27
-rw-r--r--src/core/lib/surface/server.h4
-rw-r--r--src/core/tsi/alts/handshaker/alts_handshaker_client.cc9
-rw-r--r--src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc4
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_event.cc6
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc11
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_utils.cc4
-rw-r--r--src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc4
-rw-r--r--src/core/tsi/ssl/session_cache/ssl_session_cache.cc3
42 files changed, 1044 insertions, 350 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 8044b4e4cd..388736b60a 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -933,6 +933,11 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
+ // state needed to support channelz interception of recv trailing metadata.
+ grpc_closure recv_trailing_metadata_ready_channelz;
+ grpc_closure* original_recv_trailing_metadata;
+ grpc_metadata_batch* recv_trailing_metadata;
+
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -994,6 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
+static void maybe_intercept_recv_trailing_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@@ -1292,6 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
+ maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@@ -1777,23 +1785,22 @@ static void recv_message_ready(void* arg, grpc_error* error) {
// recv_trailing_metadata handling
//
-// Sets *status and *server_pushback_md based on batch_data and error.
-static void get_call_status(subchannel_batch_data* batch_data,
- grpc_error* error, grpc_status_code* status,
+// Sets *status and *server_pushback_md based on md_batch and error.
+// Only sets *server_pushback_md if server_pushback_md != nullptr.
+static void get_call_status(grpc_call_element* elem,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
- grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
nullptr);
} else {
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ if (server_pushback_md != nullptr &&
+ md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
}
@@ -1966,8 +1973,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
grpc_mdelem* server_pushback_md = nullptr;
- get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ if (channelz_subchannel != nullptr) {
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ }
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
@@ -2572,6 +2590,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
}
//
+// Channelz
+//
+
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
+ "error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ calld->recv_trailing_metadata = nullptr;
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
+}
+
+// If channelz is enabled, intercept recv_trailing so that we may check the
+// status and associate it to a subchannel.
+// Returns true if callback was intercepted, false otherwise.
+static void maybe_intercept_recv_trailing_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
+ // only add interceptor is channelz is enabled.
+ if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
+ batch);
+ }
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, elem,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
+ calld->recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_channelz;
+}
+
+//
// LB pick
//
@@ -2600,6 +2681,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
new_error = grpc_error_add_child(new_error, error);
pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
} else {
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ if (channelz_subchannel != nullptr) {
+ channelz_subchannel->RecordCallStarted();
+ }
if (parent_data_size > 0) {
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 86c765df52..7e8f59bcd3 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -20,10 +20,13 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include <grpc/support/string_util.h>
+
namespace grpc_core {
namespace channelz {
namespace {
@@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
is_top_level_channel);
}
+SubchannelNode::SubchannelNode(grpc_subchannel* subchannel,
+ size_t channel_tracer_max_nodes)
+ : BaseNode(EntityType::kSubchannel),
+ subchannel_(subchannel),
+ target_(
+ UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))),
+ trace_(channel_tracer_max_nodes) {}
+
+SubchannelNode::~SubchannelNode() {}
+
+void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
+ grpc_connectivity_state state;
+ if (subchannel_ == nullptr) {
+ state = GRPC_CHANNEL_SHUTDOWN;
+ } else {
+ state = grpc_subchannel_check_connectivity(subchannel_, nullptr);
+ }
+ json = grpc_json_create_child(nullptr, json, "state", nullptr,
+ GRPC_JSON_OBJECT, false);
+ grpc_json_create_child(nullptr, json, "state",
+ grpc_connectivity_state_name(state), GRPC_JSON_STRING,
+ false);
+}
+
+grpc_json* SubchannelNode::RenderJson() {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "subchannelId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ PopulateConnectivityState(json);
+ GPR_ASSERT(target_.get() != nullptr);
+ grpc_json_create_child(nullptr, json, "target", target_.get(),
+ GRPC_JSON_STRING, false);
+ // fill in the channel trace if applicable
+ grpc_json* trace_json = trace_.RenderJson();
+ if (trace_json != nullptr) {
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
+ }
+ // ask CallCountingHelper to populate trace and call count data.
+ call_counter_.PopulateCallCounts(json);
+ return top_level_json;
+}
+
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h
index 6f27b5c8b7..8ce331e529 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.h
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.h
@@ -23,9 +23,12 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
+typedef struct grpc_subchannel grpc_subchannel;
+
namespace grpc_core {
// TODO(ncteisen), this only contains the uuids of the children for now,
@@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode {
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
- // Override this functionality since client_channels have a notion of
- // channel connectivity.
- void PopulateConnectivityState(grpc_json* json) override;
+ ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+ virtual ~ClientChannelNode() {}
- // Override this functionality since client_channels have subchannels
+ // Overriding template methods from ChannelNode to render information that
+ // only ClientChannelNode knows about.
+ void PopulateConnectivityState(grpc_json* json) override;
void PopulateChildRefs(grpc_json* json) override;
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
- protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
- virtual ~ClientChannelNode() {}
-
private:
grpc_channel_element* client_channel_;
};
+// Handles channelz bookkeeping for sockets
+class SubchannelNode : public BaseNode {
+ public:
+ SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes);
+ ~SubchannelNode() override;
+
+ void MarkSubchannelDestroyed() {
+ GPR_ASSERT(subchannel_ != nullptr);
+ subchannel_ = nullptr;
+ }
+
+ grpc_json* RenderJson() override;
+
+ // proxy methods to composed classes.
+ void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
+ trace_.AddTraceEvent(severity, data);
+ }
+ void AddTraceEventWithReference(ChannelTrace::Severity severity,
+ grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_channel) {
+ trace_.AddTraceEventWithReference(severity, data,
+ std::move(referenced_channel));
+ }
+ void RecordCallStarted() { call_counter_.RecordCallStarted(); }
+ void RecordCallFailed() { call_counter_.RecordCallFailed(); }
+ void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
+
+ private:
+ grpc_subchannel* subchannel_;
+ UniquePtr<char> target_;
+ CallCountingHelper call_counter_;
+ ChannelTrace trace_;
+
+ void PopulateConnectivityState(grpc_json* json);
+};
+
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 25b0149393..1ee1925a25 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
- child_channels->push_back(channel_node->channel_uuid());
+ child_channels->push_back(channel_node->uuid());
}
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 602d6e92f9..ed8cc60ea1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -71,11 +71,12 @@ class PickFirst : public LoadBalancingPolicy {
: public SubchannelData<PickFirstSubchannelList,
PickFirstSubchannelData> {
public:
- PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address,
- grpc_subchannel* subchannel,
- grpc_combiner* combiner)
+ PickFirstSubchannelData(
+ SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
+ subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
: SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
combiner) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 4195c1e9d1..8dd5820bae 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -89,11 +89,12 @@ class RoundRobin : public LoadBalancingPolicy {
: public SubchannelData<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
- RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address,
- grpc_subchannel* subchannel,
- grpc_combiner* combiner)
+ RoundRobinSubchannelData(
+ SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
+ subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
: SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
combiner),
user_data_vtable_(user_data_vtable),
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 91ddaec8b8..5e8682e056 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -65,6 +65,10 @@ class MySubchannelList
namespace grpc_core {
+// Forward declaration.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList;
+
// Stores data for a particular subchannel in a subchannel list.
// Callers must create a subclass that implements the
// ProcessConnectivityChangeLocked() method.
@@ -72,7 +76,9 @@ template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelData {
public:
// Returns a pointer to the subchannel list containing this object.
- SubchannelListType* subchannel_list() const { return subchannel_list_; }
+ SubchannelListType* subchannel_list() const {
+ return static_cast<SubchannelListType*>(subchannel_list_);
+ }
// Returns the index into the subchannel list of this object.
size_t Index() const {
@@ -133,10 +139,11 @@ class SubchannelData {
GRPC_ABSTRACT_BASE_CLASS
protected:
- SubchannelData(SubchannelListType* subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
- grpc_combiner* combiner);
+ SubchannelData(
+ SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner);
virtual ~SubchannelData();
@@ -161,7 +168,7 @@ class SubchannelData {
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
// Backpointer to owning subchannel list. Not owned.
- SubchannelListType* subchannel_list_;
+ SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
// The subchannel and connected subchannel.
grpc_subchannel* subchannel_;
@@ -200,7 +207,7 @@ class SubchannelList
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
if (subchannel_node != nullptr) {
- refs_list->push_back(subchannel_node->subchannel_uuid());
+ refs_list->push_back(subchannel_node->uuid());
}
}
}
@@ -268,7 +275,7 @@ class SubchannelList
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
- SubchannelListType* subchannel_list,
+ SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const grpc_lb_user_data_vtable* user_data_vtable,
const grpc_lb_address& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
@@ -532,8 +539,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address_uri);
gpr_free(address_uri);
}
- subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
- addresses->user_data_vtable,
+ subchannels_.emplace_back(this, addresses->user_data_vtable,
addresses->addresses[i], subchannel, combiner);
}
}
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index b3900114ad..b94429e207 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -125,9 +125,16 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host)));
if (host_end != nullptr) {
GPR_ASSERT(host_end >= host);
- char host_without_scope[GRPC_INET6_ADDRSTRLEN];
+ char host_without_scope[GRPC_INET6_ADDRSTRLEN + 1];
size_t host_without_scope_len = static_cast<size_t>(host_end - host);
uint32_t sin6_scope_id = 0;
+ if (host_without_scope_len > GRPC_INET6_ADDRSTRLEN) {
+ gpr_log(GPR_ERROR,
+ "invalid ipv6 address length %zu. Length cannot be greater than "
+ "GRPC_INET6_ADDRSTRLEN i.e %d)",
+ host_without_scope_len, GRPC_INET6_ADDRSTRLEN);
+ goto done;
+ }
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
@@ -190,3 +197,12 @@ bool grpc_parse_uri(const grpc_uri* uri, grpc_resolved_address* resolved_addr) {
gpr_log(GPR_ERROR, "Can't parse scheme '%s'", uri->scheme);
return false;
}
+
+uint16_t grpc_strhtons(const char* port) {
+ if (strcmp(port, "http") == 0) {
+ return htons(80);
+ } else if (strcmp(port, "https") == 0) {
+ return htons(443);
+ }
+ return htons(static_cast<unsigned short>(atoi(port)));
+}
diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index 9a88b66edc..c2af0e6c49 100644
--- a/src/core/ext/filters/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -47,4 +47,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
bool log_errors);
+/* Converts named or numeric port to a uint16 suitable for use in a sockaddr. */
+uint16_t grpc_strhtons(const char* port);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 485998f5e4..4c795c34c8 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -87,15 +87,6 @@ typedef struct grpc_ares_hostbyname_request {
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
-static uint16_t strhtons(const char* port) {
- if (strcmp(port, "http") == 0) {
- return htons(80);
- } else if (strcmp(port, "https") == 0) {
- return htons(443);
- }
- return htons(static_cast<unsigned short>(atoi(port)));
-}
-
static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
const char* input_output_str) {
for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
@@ -139,12 +130,6 @@ void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) {
}
}
-/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */
-void grpc_cares_wrapper_test_only_address_sorting_sort(
- grpc_lb_addresses* lb_addrs) {
- grpc_cares_wrapper_address_sorting_sort(lb_addrs);
-}
-
static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
r->pending_queries++;
}
@@ -371,7 +356,8 @@ done:
grpc_ares_request_unref_locked(r);
}
-static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
+static grpc_ares_request*
+grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@@ -454,12 +440,12 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
}
r->pending_queries = 1;
if (grpc_ares_query_ipv6()) {
- hr = create_hostbyname_request_locked(r, host, strhtons(port),
+ hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port),
false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked,
hr);
}
- hr = create_hostbyname_request_locked(r, host, strhtons(port),
+ hr = create_hostbyname_request_locked(r, host, grpc_strhtons(port),
false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked,
hr);
@@ -494,6 +480,79 @@ error_cleanup:
return nullptr;
}
+static bool inner_resolve_as_ip_literal_locked(const char* name,
+ const char* default_port,
+ grpc_lb_addresses** addrs,
+ char** host, char** port,
+ char** hostport) {
+ gpr_split_host_port(name, host, port);
+ if (*host == nullptr) {
+ gpr_log(GPR_ERROR,
+ "Failed to parse %s to host:port while attempting to resolve as ip "
+ "literal.",
+ name);
+ return false;
+ }
+ if (*port == nullptr) {
+ if (default_port == nullptr) {
+ gpr_log(GPR_ERROR,
+ "No port or default port for %s while attempting to resolve as "
+ "ip literal.",
+ name);
+ return false;
+ }
+ *port = gpr_strdup(default_port);
+ }
+ grpc_resolved_address addr;
+ GPR_ASSERT(gpr_join_host_port(hostport, *host, atoi(*port)));
+ if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) ||
+ grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) {
+ GPR_ASSERT(*addrs == nullptr);
+ *addrs = grpc_lb_addresses_create(1, nullptr);
+ grpc_lb_addresses_set_address(
+ *addrs, 0, addr.addr, addr.len, false /* is_balancer */,
+ nullptr /* balancer_name */, nullptr /* user_data */);
+ return true;
+ }
+ return false;
+}
+
+static bool resolve_as_ip_literal_locked(const char* name,
+ const char* default_port,
+ grpc_lb_addresses** addrs) {
+ char* host = nullptr;
+ char* port = nullptr;
+ char* hostport = nullptr;
+ bool out = inner_resolve_as_ip_literal_locked(name, default_port, addrs,
+ &host, &port, &hostport);
+ gpr_free(host);
+ gpr_free(port);
+ gpr_free(hostport);
+ return out;
+}
+
+static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
+ const char* dns_server, const char* name, const char* default_port,
+ grpc_pollset_set* interested_parties, grpc_closure* on_done,
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ grpc_combiner* combiner) {
+ // Early out if the target is an ipv4 or ipv6 literal.
+ if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
+ GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
+ return nullptr;
+ }
+ // Early out if the target is localhost and we're on Windows.
+ if (grpc_ares_maybe_resolve_localhost_manually_locked(name, default_port,
+ addrs)) {
+ GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
+ return nullptr;
+ }
+ // Look up name using c-ares lib.
+ return grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
+ dns_server, name, default_port, interested_parties, on_done, addrs,
+ check_grpclb, service_config_json, combiner);
+}
+
grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
@@ -502,7 +561,9 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
void grpc_cancel_ares_request(grpc_ares_request* r) {
if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) {
- grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
+ if (r != nullptr) {
+ grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
+ }
}
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index ca5779e1d7..1bc457d4cf 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -81,9 +81,15 @@ void grpc_ares_complete_request_locked(grpc_ares_request* request);
/* E.g., return false if ipv6 is known to not be available. */
bool grpc_ares_query_ipv6();
-/* Exposed only for testing */
-void grpc_cares_wrapper_test_only_address_sorting_sort(
- grpc_lb_addresses* lb_addrs);
+/* Maybe (depending on the current platform) checks if "name" matches
+ * "localhost" and if so fills in addrs with the correct sockaddr structures.
+ * Returns a bool indicating whether or not such an action was performed.
+ * See https://github.com/grpc/grpc/issues/15158. */
+bool grpc_ares_maybe_resolve_localhost_manually_locked(
+ const char* name, const char* default_port, grpc_lb_addresses** addrs);
+
+/* Sorts destinations in lb_addrs according to RFC 6724. */
+void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \
*/
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
index 23c0fec74f..639eec2323 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
@@ -26,4 +26,9 @@
bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
+bool grpc_ares_maybe_resolve_localhost_manually_locked(
+ const char* name, const char* default_port, grpc_lb_addresses** addrs) {
+ return false;
+}
+
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
index ee827e284e..7e34784691 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
@@ -21,9 +21,79 @@
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GPR_WINDOWS)
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/socket_windows.h"
bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
+static bool inner_maybe_resolve_localhost_manually_locked(
+ const char* name, const char* default_port, grpc_lb_addresses** addrs,
+ char** host, char** port) {
+ gpr_split_host_port(name, host, port);
+ if (*host == nullptr) {
+ gpr_log(GPR_ERROR,
+ "Failed to parse %s into host:port during Windows localhost "
+ "resolution check.",
+ name);
+ return false;
+ }
+ if (*port == nullptr) {
+ if (default_port == nullptr) {
+ gpr_log(GPR_ERROR,
+ "No port or default port for %s during Windows localhost "
+ "resolution check.",
+ name);
+ return false;
+ }
+ *port = gpr_strdup(default_port);
+ }
+ if (gpr_stricmp(*host, "localhost") == 0) {
+ GPR_ASSERT(*addrs == nullptr);
+ *addrs = grpc_lb_addresses_create(2, nullptr);
+ uint16_t numeric_port = grpc_strhtons(*port);
+ // Append the ipv6 loopback address.
+ struct sockaddr_in6 ipv6_loopback_addr;
+ memset(&ipv6_loopback_addr, 0, sizeof(ipv6_loopback_addr));
+ ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1;
+ ipv6_loopback_addr.sin6_family = AF_INET6;
+ ipv6_loopback_addr.sin6_port = numeric_port;
+ grpc_lb_addresses_set_address(
+ *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr),
+ false /* is_balancer */, nullptr /* balancer_name */,
+ nullptr /* user_data */);
+ // Append the ipv4 loopback address.
+ struct sockaddr_in ipv4_loopback_addr;
+ memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr));
+ ((char*)&ipv4_loopback_addr.sin_addr)[0] = 0x7f;
+ ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01;
+ ipv4_loopback_addr.sin_family = AF_INET;
+ ipv4_loopback_addr.sin_port = numeric_port;
+ grpc_lb_addresses_set_address(
+ *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr),
+ false /* is_balancer */, nullptr /* balancer_name */,
+ nullptr /* user_data */);
+ // Let the address sorter figure out which one should be tried first.
+ grpc_cares_wrapper_address_sorting_sort(*addrs);
+ return true;
+ }
+ return false;
+}
+
+bool grpc_ares_maybe_resolve_localhost_manually_locked(
+ const char* name, const char* default_port, grpc_lb_addresses** addrs) {
+ char* host = nullptr;
+ char* port = nullptr;
+ bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port,
+ addrs, &host, &port);
+ gpr_free(host);
+ gpr_free(port);
+ return out;
+}
+
#endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 0e40f42e18..57d0b3759f 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
- c->channelz_subchannel.reset();
+ if (c->channelz_subchannel != nullptr) {
+ c->channelz_subchannel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Subchannel destroyed"));
+ c->channelz_subchannel->MarkSubchannelDestroyed();
+ c->channelz_subchannel.reset();
+ }
gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_arg* arg =
grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
+ arg = grpc_channel_args_find(c->args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ const grpc_integer_options options = {0, 0, INT_MAX};
+ size_t channel_tracer_max_nodes =
+ (size_t)grpc_channel_arg_get_integer(arg, options);
if (channelz_enabled) {
c->channelz_subchannel =
- grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>();
+ grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
+ c, channel_tracer_max_nodes);
+ c->channelz_subchannel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Subchannel created"));
}
return grpc_subchannel_index_register(key, c);
@@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
}
/* publish */
- c->connected_subchannel.reset(
- grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
+ stk, c->channelz_subchannel.get()));
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
@@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
}
}
+const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) {
+ const grpc_arg* addr_arg =
+ grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
+ const char* addr_str = grpc_channel_arg_get_string(addr_arg);
+ GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
+ return addr_str;
+}
+
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
const grpc_arg* addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
@@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
namespace grpc_core {
-ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ConnectedSubchannel::ConnectedSubchannel(
+ grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel)
: RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
- channel_stack_(channel_stack) {}
+ channel_stack_(channel_stack),
+ channelz_subchannel_(channelz_subchannel) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index a135035d62..84febb5204 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
size_t parent_data_size;
};
- explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel);
~ConnectedSubchannel();
grpc_channel_stack* channel_stack() { return channel_stack_; }
@@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+ channelz::SubchannelNode* channelz_subchannel() {
+ return channelz_subchannel_;
+ }
private:
grpc_channel_stack* channel_stack_;
+ // backpointer to the channelz node in this connected subchannel's
+ // owning subchannel.
+ channelz::SubchannelNode* channelz_subchannel_;
};
} // namespace grpc_core
@@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
grpc_resolved_address* addr);
+const char* grpc_subchannel_get_target(grpc_subchannel* subchannel);
+
/// Returns the URI string for the address to connect to.
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args);
diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc
index cb02b1a748..f2b6c24e8e 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.cc
+++ b/src/core/ext/filters/client_channel/subchannel_index.cc
@@ -42,7 +42,7 @@ struct grpc_subchannel_key {
grpc_subchannel_args args;
};
-static bool g_force_creation = false;
+static gpr_atm g_force_creation = false;
static grpc_subchannel_key* create_key(
const grpc_subchannel_args* args,
@@ -73,7 +73,8 @@ static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
const grpc_subchannel_key* b) {
- if (g_force_creation) return false;
+ // To pretend the keys are different, return a non-zero value.
+ if (GPR_UNLIKELY(gpr_atm_no_barrier_load(&g_force_creation))) return 1;
int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.filter_count > 0) {
@@ -250,5 +251,5 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
}
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
- g_force_creation = force_creation;
+ gpr_atm_no_barrier_store(&g_force_creation, force_creation);
}
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index a7dae9d47d..c135613d26 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -65,13 +65,10 @@ void grpc_subchannel_index_ref(void);
void grpc_subchannel_index_unref(void);
/** \em TEST ONLY.
- * If \a force_creation is true, all key comparisons will be false, resulting in
+ * If \a force_creation is true, all keys are regarded different, resulting in
* new subchannels always being created. Otherwise, the keys will be compared as
* usual.
*
- * This function is *not* threadsafe on purpose: it should *only* be used in
- * test code.
- *
* Tests using this function \em MUST run tests with and without \a
* force_creation set. */
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation);
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index 90336103cf..cd3cab134a 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -23,6 +23,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <string.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
@@ -69,6 +70,10 @@ struct call_data {
bool seen_recv_trailing_metadata_ready;
};
+struct channel_data {
+ bool surface_user_agent;
+};
+
} // namespace
static grpc_error* hs_filter_outgoing_metadata(grpc_call_element* elem,
@@ -265,6 +270,11 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority")));
}
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ if (!chand->surface_user_agent && b->idx.named.user_agent != nullptr) {
+ grpc_metadata_batch_remove(b, b->idx.named.user_agent);
+ }
+
return error;
}
@@ -451,7 +461,12 @@ static void hs_destroy_call_elem(grpc_call_element* elem,
/* Constructor for channel_data */
static grpc_error* hs_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
GPR_ASSERT(!args->is_last);
+ chand->surface_user_agent = grpc_channel_arg_get_bool(
+ grpc_channel_args_find(args->channel_args,
+ const_cast<char*>(GRPC_ARG_SURFACE_USER_AGENT)),
+ true);
return GRPC_ERROR_NONE;
}
@@ -465,7 +480,7 @@ const grpc_channel_filter grpc_http_server_filter = {
hs_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
hs_destroy_call_elem,
- 0,
+ sizeof(channel_data),
hs_init_channel_elem,
hs_destroy_channel_elem,
grpc_channel_next_get_info,
diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc
index b3443310ac..cfb2faba51 100644
--- a/src/core/lib/channel/channel_trace.cc
+++ b/src/core/lib/channel/channel_trace.cc
@@ -41,16 +41,14 @@
namespace grpc_core {
namespace channelz {
-ChannelTrace::TraceEvent::TraceEvent(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type)
+ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_entity)
: severity_(severity),
data_(data),
timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
GPR_CLOCK_REALTIME)),
next_(nullptr),
- referenced_channel_(std::move(referenced_channel)),
- referenced_type_(type) {}
+ referenced_entity_(std::move(referenced_entity)) {}
ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data)
: severity_(severity),
@@ -110,23 +108,13 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) {
AddTraceEventHelper(New<TraceEvent>(severity, data));
}
-void ChannelTrace::AddTraceEventReferencingChannel(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel) {
- if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
- // create and fill up the new event
- AddTraceEventHelper(New<TraceEvent>(
- severity, data, std::move(referenced_channel), ReferencedType::Channel));
-}
-
-void ChannelTrace::AddTraceEventReferencingSubchannel(
+void ChannelTrace::AddTraceEventWithReference(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_subchannel) {
+ RefCountedPtr<BaseNode> referenced_entity) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
- AddTraceEventHelper(New<TraceEvent>(severity, data,
- std::move(referenced_subchannel),
- ReferencedType::Subchannel));
+ AddTraceEventHelper(
+ New<TraceEvent>(severity, data, std::move(referenced_entity)));
}
namespace {
@@ -157,19 +145,18 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
json_iterator = grpc_json_create_child(json_iterator, json, "timestamp",
gpr_format_timespec(timestamp_),
GRPC_JSON_STRING, true);
- if (referenced_channel_ != nullptr) {
+ if (referenced_entity_ != nullptr) {
+ const bool is_channel =
+ (referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel ||
+ referenced_entity_->type() == BaseNode::EntityType::kInternalChannel);
char* uuid_str;
- gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid());
+ gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_entity_->uuid());
grpc_json* child_ref = grpc_json_create_child(
- json_iterator, json,
- (referenced_type_ == ReferencedType::Channel) ? "channelRef"
- : "subchannelRef",
+ json_iterator, json, is_channel ? "channelRef" : "subchannelRef",
nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_create_child(
- nullptr, child_ref,
- (referenced_type_ == ReferencedType::Channel) ? "channelId"
- : "subchannelId",
- uuid_str, GRPC_JSON_STRING, true);
+ nullptr, child_ref, is_channel ? "channelId" : "subchannelId", uuid_str,
+ GRPC_JSON_STRING, true);
json_iterator = child_ref;
}
}
@@ -178,24 +165,26 @@ grpc_json* ChannelTrace::RenderJson() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
- char* num_events_logged_str;
- gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_);
grpc_json* json_iterator = nullptr;
- json_iterator =
- grpc_json_create_child(json_iterator, json, "numEventsLogged",
- num_events_logged_str, GRPC_JSON_STRING, true);
+ if (num_events_logged_ > 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "numEventsLogged", num_events_logged_);
+ }
json_iterator = grpc_json_create_child(
json_iterator, json, "creationTimestamp",
gpr_format_timespec(time_created_), GRPC_JSON_STRING, true);
- grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
- nullptr, GRPC_JSON_ARRAY, false);
- json_iterator = nullptr;
- TraceEvent* it = head_trace_;
- while (it != nullptr) {
- json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
- nullptr, GRPC_JSON_OBJECT, false);
- it->RenderTraceEvent(json_iterator);
- it = it->next();
+ // only add in the event list if it is non-empty.
+ if (num_events_logged_ > 0) {
+ grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
+ nullptr, GRPC_JSON_ARRAY, false);
+ json_iterator = nullptr;
+ TraceEvent* it = head_trace_;
+ while (it != nullptr) {
+ json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
+ nullptr, GRPC_JSON_OBJECT, false);
+ it->RenderTraceEvent(json_iterator);
+ it = it->next();
+ }
}
return json;
}
diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h
index 596af7402f..94fea20b45 100644
--- a/src/core/lib/channel/channel_trace.h
+++ b/src/core/lib/channel/channel_trace.h
@@ -30,7 +30,7 @@
namespace grpc_core {
namespace channelz {
-class ChannelNode;
+class BaseNode;
// Object used to hold live data for a channel. This data is exposed via the
// channelz service:
@@ -55,35 +55,28 @@ class ChannelTrace {
void AddTraceEvent(Severity severity, grpc_slice data);
// Adds a new trace event to the tracing object. This trace event refers to a
- // an event on a child of the channel. For example, if this channel has
- // created a new subchannel, then it would record that with a TraceEvent
- // referencing the new subchannel.
+ // an event that concerns a different channelz entity. For example, if this
+ // channel has created a new subchannel, then it would record that with
+ // a TraceEvent referencing the new subchannel.
//
// TODO(ncteisen): as this call is used more and more throughout the gRPC
// stack, determine if it makes more sense to accept a char* instead of a
// slice.
- void AddTraceEventReferencingChannel(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel);
- void AddTraceEventReferencingSubchannel(
- Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_subchannel);
+ void AddTraceEventWithReference(Severity severity, grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_entity);
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
grpc_json* RenderJson() const;
private:
- // Types of objects that can be references by trace events.
- enum class ReferencedType { Channel, Subchannel };
// Private class to encapsulate all the data and bookkeeping needed for a
// a trace event.
class TraceEvent {
public:
- // Constructor for a TraceEvent that references a different channel.
+ // Constructor for a TraceEvent that references a channel.
TraceEvent(Severity severity, grpc_slice data,
- RefCountedPtr<ChannelNode> referenced_channel,
- ReferencedType type);
+ RefCountedPtr<BaseNode> referenced_entity_);
// Constructor for a TraceEvent that does not reverence a different
// channel.
@@ -105,10 +98,7 @@ class ChannelTrace {
gpr_timespec timestamp_;
TraceEvent* next_;
// the tracer object for the (sub)channel that this trace event refers to.
- RefCountedPtr<ChannelNode> referenced_channel_;
- // the type that the referenced tracer points to. Unused if this trace
- // does not point to any channel or subchannel
- ReferencedType referenced_type_;
+ RefCountedPtr<BaseNode> referenced_entity_;
}; // TraceEvent
// Internal helper to add and link in a trace event
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 9d6002ed8a..375cf25cc6 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -41,33 +41,62 @@
namespace grpc_core {
namespace channelz {
-ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel)
- : channel_(channel),
- target_(nullptr),
- channel_uuid_(-1),
- is_top_level_channel_(is_top_level_channel) {
- trace_.Init(channel_tracer_max_nodes);
- target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
- channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this);
+BaseNode::BaseNode(EntityType type)
+ : type_(type), uuid_(ChannelzRegistry::Register(this)) {}
+
+BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
+
+char* BaseNode::RenderJsonString() {
+ grpc_json* json = RenderJson();
+ GPR_ASSERT(json != nullptr);
+ char* json_str = grpc_json_dump_to_string(json, 0);
+ grpc_json_destroy(json);
+ return json_str;
+}
+
+CallCountingHelper::CallCountingHelper() {
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-ChannelNode::~ChannelNode() {
- trace_.Destroy();
- ChannelzRegistry::UnregisterChannelNode(channel_uuid_);
-}
+CallCountingHelper::~CallCountingHelper() {}
-void ChannelNode::RecordCallStarted() {
+void CallCountingHelper::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-void ChannelNode::PopulateConnectivityState(grpc_json* json) {}
+void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
+ grpc_json* json_iterator = nullptr;
+ if (calls_started_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsStarted", calls_started_);
+ }
+ if (calls_succeeded_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsSucceeded", calls_succeeded_);
+ }
+ if (calls_failed_) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsFailed", calls_failed_);
+ }
+ gpr_timespec ts =
+ grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
+ json_iterator =
+ grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
+ gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+}
+
+ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel
+ : EntityType::kInternalChannel),
+ channel_(channel),
+ target_(UniquePtr<char>(grpc_channel_get_target(channel_))),
+ trace_(channel_tracer_max_nodes) {}
-void ChannelNode::PopulateChildRefs(grpc_json* json) {}
+ChannelNode::~ChannelNode() {}
grpc_json* ChannelNode::RenderJson() {
// We need to track these three json objects to build our object
@@ -80,7 +109,7 @@ grpc_json* ChannelNode::RenderJson() {
json = json_iterator;
json_iterator = nullptr;
json_iterator = grpc_json_add_number_string_child(json, json_iterator,
- "channelId", channel_uuid_);
+ "channelId", uuid());
// reset json iterators to top level object
json = top_level_json;
json_iterator = nullptr;
@@ -89,51 +118,28 @@ grpc_json* ChannelNode::RenderJson() {
GRPC_JSON_OBJECT, false);
json = data;
json_iterator = nullptr;
+ // template method. Child classes may override this to add their specific
+ // functionality.
PopulateConnectivityState(json);
+ // populate the target.
GPR_ASSERT(target_.get() != nullptr);
- json_iterator = grpc_json_create_child(
- json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
+ grpc_json_create_child(nullptr, json, "target", target_.get(),
+ GRPC_JSON_STRING, false);
// fill in the channel trace if applicable
- grpc_json* trace = trace_->RenderJson();
- if (trace != nullptr) {
- // we manually link up and fill the child since it was created for us in
- // ChannelTrace::RenderJson
- trace->key = "trace"; // this object is named trace in channelz.proto
- json_iterator = grpc_json_link_child(json, trace, json_iterator);
- }
- // reset the parent to be the data object.
- json = data;
- json_iterator = nullptr;
- if (calls_started_ != 0) {
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsStarted", calls_started_);
+ grpc_json* trace_json = trace_.RenderJson();
+ if (trace_json != nullptr) {
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
}
- if (calls_succeeded_ != 0) {
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsSucceeded", calls_succeeded_);
- }
- if (calls_failed_) {
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsFailed", calls_failed_);
- }
- gpr_timespec ts =
- grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
- json_iterator =
- grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
- gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+ // ask CallCountingHelper to populate trace and call count data.
+ call_counter_.PopulateCallCounts(json);
json = top_level_json;
- json_iterator = nullptr;
+ // template method. Child classes may override this to add their specific
+ // functionality.
PopulateChildRefs(json);
return top_level_json;
}
-char* ChannelNode::RenderJsonString() {
- grpc_json* json = RenderJson();
- char* json_str = grpc_json_dump_to_string(json, 0);
- grpc_json_destroy(json);
- return json_str;
-}
-
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
@@ -141,12 +147,41 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
channel, channel_tracer_max_nodes, is_top_level_channel);
}
-SubchannelNode::SubchannelNode() {
- subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this);
-}
+ServerNode::ServerNode(size_t channel_tracer_max_nodes)
+ : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {}
+
+ServerNode::~ServerNode() {}
-SubchannelNode::~SubchannelNode() {
- ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_);
+grpc_json* ServerNode::RenderJson() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "serverId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ // fill in the channel trace if applicable
+ grpc_json* trace_json = trace_.RenderJson();
+ if (trace_json != nullptr) {
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
+ }
+ // ask CallCountingHelper to populate trace and call count data.
+ call_counter_.PopulateCallCounts(json);
+ json = top_level_json;
+ return top_level_json;
}
} // namespace channelz
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 07eb73d626..9be256147b 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -43,14 +43,52 @@ namespace grpc_core {
namespace channelz {
namespace testing {
+class CallCountingHelperPeer;
class ChannelNodePeer;
-}
+} // namespace testing
-class ChannelNode : public RefCounted<ChannelNode> {
+// base class for all channelz entities
+class BaseNode : public RefCounted<BaseNode> {
public:
- static RefCountedPtr<ChannelNode> MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
+ // There are only four high level channelz entities. However, to support
+ // GetTopChannelsRequest, we split the Channel entity into two different
+ // types. All children of BaseNode must be one of these types.
+ enum class EntityType {
+ kTopLevelChannel,
+ kInternalChannel,
+ kSubchannel,
+ kServer,
+ kSocket,
+ };
+
+ explicit BaseNode(EntityType type);
+ virtual ~BaseNode();
+
+ // All children must implement this function.
+ virtual grpc_json* RenderJson() GRPC_ABSTRACT;
+
+ // Renders the json and returns allocated string that must be freed by the
+ // caller.
+ char* RenderJsonString();
+
+ EntityType type() const { return type_; }
+ intptr_t uuid() const { return uuid_; }
+
+ private:
+ const EntityType type_;
+ const intptr_t uuid_;
+};
+
+// This class is a helper class for channelz entities that deal with Channels,
+// Subchannels, and Servers, since those have similar proto definitions.
+// This class has the ability to:
+// - track calls_{started,succeeded,failed}
+// - track last_call_started_timestamp
+// - perform rendering of the above items
+class CallCountingHelper {
+ public:
+ CallCountingHelper();
+ ~CallCountingHelper();
void RecordCallStarted();
void RecordCallFailed() {
@@ -60,17 +98,46 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
- grpc_json* RenderJson();
- char* RenderJsonString();
+ // Common rendering of the call count data and last_call_started_timestamp.
+ void PopulateCallCounts(grpc_json* json);
- // helper for getting and populating connectivity state. It is virtual
- // because it allows the client_channel specific code to live in ext/
- // instead of lib/
- virtual void PopulateConnectivityState(grpc_json* json);
+ private:
+ // testing peer friend.
+ friend class testing::CallCountingHelperPeer;
- virtual void PopulateChildRefs(grpc_json* json);
+ gpr_atm calls_started_ = 0;
+ gpr_atm calls_succeeded_ = 0;
+ gpr_atm calls_failed_ = 0;
+ gpr_atm last_call_started_millis_ = 0;
+};
- ChannelTrace* trace() { return trace_.get(); }
+// Handles channelz bookkeeping for channels
+class ChannelNode : public BaseNode {
+ public:
+ static RefCountedPtr<ChannelNode> MakeChannelNode(
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+
+ ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
+ ~ChannelNode() override;
+
+ grpc_json* RenderJson() override;
+
+ // template methods. RenderJSON uses these methods to render its JSON
+ // representation. These are virtual so that children classes may provide
+ // their specific mechanism for populating these parts of the channelz
+ // object.
+ //
+ // ChannelNode does not have a notion of connectivity state or child refs,
+ // so it leaves these implementations blank.
+ //
+ // This is utilizing the template method design pattern.
+ //
+ // TODO(ncteisen): remove these template methods in favor of manual traversal
+ // and mutation of the grpc_json object.
+ virtual void PopulateConnectivityState(grpc_json* json) {}
+ virtual void PopulateChildRefs(grpc_json* json) {}
void MarkChannelDestroyed() {
GPR_ASSERT(channel_ != nullptr);
@@ -79,47 +146,62 @@ class ChannelNode : public RefCounted<ChannelNode> {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
- intptr_t channel_uuid() { return channel_uuid_; }
- bool is_top_level_channel() { return is_top_level_channel_; }
-
- protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
- bool is_top_level_channel);
- virtual ~ChannelNode();
+ // proxy methods to composed classes.
+ void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
+ trace_.AddTraceEvent(severity, data);
+ }
+ void AddTraceEventWithReference(ChannelTrace::Severity severity,
+ grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_channel) {
+ trace_.AddTraceEventWithReference(severity, data,
+ std::move(referenced_channel));
+ }
+ void RecordCallStarted() { call_counter_.RecordCallStarted(); }
+ void RecordCallFailed() { call_counter_.RecordCallFailed(); }
+ void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
- // testing peer friend.
+ // to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
-
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
- gpr_atm calls_started_ = 0;
- gpr_atm calls_succeeded_ = 0;
- gpr_atm calls_failed_ = 0;
- gpr_atm last_call_started_millis_ = 0;
- intptr_t channel_uuid_;
- bool is_top_level_channel_ = true;
- ManualConstructor<ChannelTrace> trace_;
+ CallCountingHelper call_counter_;
+ ChannelTrace trace_;
};
-// Placeholds channelz class for subchannels. All this can do now is track its
-// uuid (this information is needed by the parent channelz class).
-// TODO(ncteisen): build this out to support the GetSubchannel channelz request.
-class SubchannelNode : public RefCounted<SubchannelNode> {
+// Handles channelz bookkeeping for servers
+class ServerNode : public BaseNode {
public:
- SubchannelNode();
- virtual ~SubchannelNode();
+ explicit ServerNode(size_t channel_tracer_max_nodes);
+ ~ServerNode() override;
- intptr_t subchannel_uuid() { return subchannel_uuid_; }
+ grpc_json* RenderJson() override;
- protected:
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
- GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+ // proxy methods to composed classes.
+ void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
+ trace_.AddTraceEvent(severity, data);
+ }
+ void AddTraceEventWithReference(ChannelTrace::Severity severity,
+ grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_channel) {
+ trace_.AddTraceEventWithReference(severity, data,
+ std::move(referenced_channel));
+ }
+ void RecordCallStarted() { call_counter_.RecordCallStarted(); }
+ void RecordCallFailed() { call_counter_.RecordCallFailed(); }
+ void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
- intptr_t subchannel_uuid_;
+ CallCountingHelper call_counter_;
+ ChannelTrace trace_;
+};
+
+// Handles channelz bookkeeping for sockets
+// TODO(ncteisen): implement in subsequent PR.
+class SocketNode : public BaseNode {
+ public:
+ SocketNode() : BaseNode(EntityType::kSocket) {}
+ ~SocketNode() override {}
};
// Creation functions
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index f79d2f0c17..adc7b6ba44 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -53,54 +53,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
-intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) {
+intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
MutexLock lock(&mu_);
- entities_.push_back(entry);
+ entities_.push_back(node);
intptr_t uuid = entities_.size();
return uuid;
}
-void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) {
+void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
GPR_ASSERT(uuid >= 1);
MutexLock lock(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
- GPR_ASSERT(entities_[uuid - 1].type == type);
- entities_[uuid - 1].object = nullptr;
- entities_[uuid - 1].type = EntityType::kUnset;
+ entities_[uuid - 1] = nullptr;
}
-void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) {
+BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
MutexLock lock(&mu_);
if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
return nullptr;
}
- if (entities_[uuid - 1].type == type) {
- return entities_[uuid - 1].object;
- } else {
- return nullptr;
- }
+ return entities_[uuid - 1];
}
char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
- InlinedVector<ChannelNode*, 10> top_level_channels;
+ InlinedVector<BaseNode*, 10> top_level_channels;
// uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
// reserved). However, we want to support requests coming in with
// start_channel_id=0, which signifies "give me everything." Hence this
// funky looking line below.
size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1;
for (size_t i = start_idx; i < entities_.size(); ++i) {
- if (entities_[i].type == EntityType::kChannelNode) {
- ChannelNode* channel_node =
- static_cast<ChannelNode*>(entities_[i].object);
- if (channel_node->is_top_level_channel()) {
- top_level_channels.push_back(channel_node);
- }
+ if (entities_[i] != nullptr &&
+ entities_[i]->type() ==
+ grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) {
+ top_level_channels.push_back(entities_[i]);
}
}
- if (top_level_channels.size() > 0) {
+ if (!top_level_channels.empty()) {
// create list of channels
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false);
@@ -120,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
return json_str;
}
+char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ InlinedVector<BaseNode*, 10> servers;
+ // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
+ // reserved). However, we want to support requests coming in with
+ // start_server_id=0, which signifies "give me everything."
+ size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1;
+ for (size_t i = start_idx; i < entities_.size(); ++i) {
+ if (entities_[i] != nullptr &&
+ entities_[i]->type() ==
+ grpc_core::channelz::BaseNode::EntityType::kServer) {
+ servers.push_back(entities_[i]);
+ }
+ }
+ if (!servers.empty()) {
+ // create list of servers
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < servers.size(); ++i) {
+ grpc_json* server_json = servers[i]->RenderJson();
+ json_iterator =
+ grpc_json_link_child(array_parent, server_json, json_iterator);
+ }
+ }
+ // For now we do not have any pagination rules. In the future we could
+ // pick a constant for max_channels_sent for a GetServers request.
+ // Tracking: https://github.com/grpc/grpc/issues/16019.
+ json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
+ GRPC_JSON_TRUE, false);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
+
} // namespace channelz
} // namespace grpc_core
@@ -128,10 +156,18 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
start_channel_id);
}
+char* grpc_channelz_get_servers(intptr_t start_server_id) {
+ return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id);
+}
+
char* grpc_channelz_get_channel(intptr_t channel_id) {
- grpc_core::channelz::ChannelNode* channel_node =
- grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
- if (channel_node == nullptr) {
+ grpc_core::channelz::BaseNode* channel_node =
+ grpc_core::channelz::ChannelzRegistry::Get(channel_id);
+ if (channel_node == nullptr ||
+ (channel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
+ channel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) {
return nullptr;
}
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
@@ -143,3 +179,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_json_destroy(top_level_json);
return json_str;
}
+
+char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
+ grpc_core::channelz::BaseNode* subchannel_node =
+ grpc_core::channelz::ChannelzRegistry::Get(subchannel_id);
+ if (subchannel_node == nullptr ||
+ subchannel_node->type() !=
+ grpc_core::channelz::BaseNode::EntityType::kSubchannel) {
+ return nullptr;
+ }
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* subchannel_json = subchannel_node->RenderJson();
+ subchannel_json->key = "subchannel";
+ grpc_json_link_child(json, subchannel_json, nullptr);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index 5d7c936726..d0d660600d 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -40,32 +40,11 @@ class ChannelzRegistry {
// To be called in grpc_shutdown();
static void Shutdown();
- // Register/Unregister/Get for ChannelNode
- static intptr_t RegisterChannelNode(ChannelNode* channel_node) {
- RegistryEntry entry(channel_node, EntityType::kChannelNode);
- return Default()->InternalRegisterEntry(entry);
- }
- static void UnregisterChannelNode(intptr_t uuid) {
- Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode);
- }
- static ChannelNode* GetChannelNode(intptr_t uuid) {
- void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode);
- return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten);
- }
-
- // Register/Unregister/Get for SubchannelNode
- static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) {
- RegistryEntry entry(channel_node, EntityType::kSubchannelNode);
- return Default()->InternalRegisterEntry(entry);
- }
- static void UnregisterSubchannelNode(intptr_t uuid) {
- Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode);
- }
- static SubchannelNode* GetSubchannelNode(intptr_t uuid) {
- void* gotten =
- Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode);
- return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten);
+ static intptr_t Register(BaseNode* node) {
+ return Default()->InternalRegister(node);
}
+ static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
+ static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); }
// Returns the allocated JSON string that represents the proto
// GetTopChannelsResponse as per channelz.proto.
@@ -73,20 +52,13 @@ class ChannelzRegistry {
return Default()->InternalGetTopChannels(start_channel_id);
}
- private:
- enum class EntityType {
- kChannelNode,
- kSubchannelNode,
- kUnset,
- };
-
- struct RegistryEntry {
- RegistryEntry(void* object_in, EntityType type_in)
- : object(object_in), type(type_in) {}
- void* object;
- EntityType type;
- };
+ // Returns the allocated JSON string that represents the proto
+ // GetServersResponse as per channelz.proto.
+ static char* GetServers(intptr_t start_server_id) {
+ return Default()->InternalGetServers(start_server_id);
+ }
+ private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -97,21 +69,22 @@ class ChannelzRegistry {
static ChannelzRegistry* Default();
// globally registers an Entry. Returns its unique uuid
- intptr_t InternalRegisterEntry(const RegistryEntry& entry);
+ intptr_t InternalRegister(BaseNode* node);
// globally unregisters the object that is associated to uuid. Also does
// sanity check that an object doesn't try to unregister the wrong type.
- void InternalUnregisterEntry(intptr_t uuid, EntityType type);
+ void InternalUnregister(intptr_t uuid);
// if object with uuid has previously been registered as the correct type,
// returns the void* associated with that uuid. Else returns nullptr.
- void* InternalGetEntry(intptr_t uuid, EntityType type);
+ BaseNode* InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
+ char* InternalGetServers(intptr_t start_server_id);
// protects entities_ and uuid_
gpr_mu mu_;
- InlinedVector<RegistryEntry, 20> entities_;
+ InlinedVector<BaseNode*, 20> entities_;
};
} // namespace channelz
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index abf96662f5..3d459059d7 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -82,6 +82,11 @@
#define GRPC_LINUX_SOCKETUTILS 1
#endif
#endif
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37)
+#define GRPC_HAVE_TCP_USER_TIMEOUT
+#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37) */
+#endif /* LINUX_VERSION_CODE */
#ifndef __GLIBC__
#define GRPC_LINUX_EPOLL 1
#define GRPC_LINUX_EPOLL_CREATE1 1
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index c4b991c94d..50674b0845 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -222,6 +223,95 @@ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency) {
return GRPC_ERROR_NONE;
}
+/* The default values for TCP_USER_TIMEOUT are currently configured to be in
+ * line with the default values of KEEPALIVE_TIMEOUT as proposed in
+ * https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */
+#define DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS 20000 /* 20 seconds */
+#define DEFAULT_SERVER_TCP_USER_TIMEOUT_MS 20000 /* 20 seconds */
+
+static int g_default_client_tcp_user_timeout_ms =
+ DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS;
+static int g_default_server_tcp_user_timeout_ms =
+ DEFAULT_SERVER_TCP_USER_TIMEOUT_MS;
+static bool g_default_client_tcp_user_timeout_enabled = false;
+static bool g_default_server_tcp_user_timeout_enabled = true;
+
+void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) {
+ if (is_client) {
+ g_default_client_tcp_user_timeout_enabled = enable;
+ if (timeout > 0) {
+ g_default_client_tcp_user_timeout_ms = timeout;
+ }
+ } else {
+ g_default_server_tcp_user_timeout_enabled = enable;
+ if (timeout > 0) {
+ g_default_server_tcp_user_timeout_ms = timeout;
+ }
+ }
+}
+
+/* Set TCP_USER_TIMEOUT */
+grpc_error* grpc_set_socket_tcp_user_timeout(
+ int fd, const grpc_channel_args* channel_args, bool is_client) {
+#ifdef GRPC_HAVE_TCP_USER_TIMEOUT
+ bool enable;
+ int timeout;
+ if (is_client) {
+ enable = g_default_client_tcp_user_timeout_enabled;
+ timeout = g_default_client_tcp_user_timeout_ms;
+ } else {
+ enable = g_default_server_tcp_user_timeout_enabled;
+ timeout = g_default_server_tcp_user_timeout_ms;
+ }
+ if (channel_args) {
+ for (unsigned int i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX});
+ /* Continue using default if value is 0 */
+ if (value == 0) {
+ continue;
+ }
+ /* Disable if value is INT_MAX */
+ enable = value != INT_MAX;
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX});
+ /* Continue using default if value is 0 */
+ if (value == 0) {
+ continue;
+ }
+ timeout = value;
+ }
+ }
+ }
+ if (enable) {
+ extern grpc_core::TraceFlag grpc_tcp_trace;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "Enabling TCP_USER_TIMEOUT with a timeout of %d ms",
+ timeout);
+ }
+ int newval;
+ socklen_t len = sizeof(newval);
+ if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
+ sizeof(timeout))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(TCP_USER_TIMEOUT)");
+ }
+ if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(TCP_USER_TIMEOUT)");
+ }
+ if (newval != timeout) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Failed to set TCP_USER_TIMEOUT");
+ }
+ }
+#else
+ gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform");
+#endif /* GRPC_HAVE_TCP_USER_TIMEOUT */
+ return GRPC_ERROR_NONE;
+}
+
/* set a socket using a grpc_socket_mutator */
grpc_error* grpc_set_socket_with_mutator(int fd, grpc_socket_mutator* mutator) {
GPR_ASSERT(mutator);
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index b3fd58a530..71a304dc4e 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -53,6 +53,13 @@ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency);
/* set SO_REUSEPORT */
grpc_error* grpc_set_socket_reuse_port(int fd, int reuse);
+/* Configure the default values for TCP_USER_TIMEOUT */
+void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client);
+
+/* Set TCP_USER_TIMEOUT */
+grpc_error* grpc_set_socket_tcp_user_timeout(
+ int fd, const grpc_channel_args* channel_args, bool is_client);
+
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 9c989b7dfe..8553ed0db4 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -76,6 +76,9 @@ static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
if (!grpc_is_unix_socket(addr)) {
err = grpc_set_socket_low_latency(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
+ true /* is_client */);
+ if (err != GRPC_ERROR_NONE) goto error;
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 9595c028ce..8d8d3f4273 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -166,6 +166,9 @@ grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server* s, int fd,
if (err != GRPC_ERROR_NONE) goto error;
err = grpc_set_socket_reuse_addr(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_tcp_user_timeout(fd, s->channel_args,
+ false /* is_client */);
+ if (err != GRPC_ERROR_NONE) goto error;
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index b07c4d6c10..11b438f5dc 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -48,6 +48,7 @@
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
@@ -95,7 +96,7 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
- grpc_error* batch_error;
+ gpr_atm batch_error;
grpc_transport_stream_op_batch op;
} batch_control;
@@ -202,6 +203,8 @@ struct grpc_call {
} client;
struct {
int* cancelled;
+ // backpointer to owning server if this is a server side call.
+ grpc_server* server;
} server;
} final_op;
grpc_error* status_error;
@@ -311,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
- if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
- } else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
- }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < args->add_initial_metadata_count; i++) {
@@ -332,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->send_extra_metadata_count =
static_cast<int>(args->add_initial_metadata_count);
} else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
@@ -435,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent);
}
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- channelz_channel->RecordCallStarted();
+ if (call->is_client) {
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ channelz_channel->RecordCallStarted();
+ }
+ } else {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ channelz_server->RecordCallStarted();
+ }
}
grpc_slice_unref_internal(path);
@@ -709,14 +718,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
} else {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
- /* TODO(ncteisen) : Update channelz handling for server
- if (channelz_channel != nullptr) {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
if (*call->final_op.server.cancelled) {
- channelz_channel->RecordCallFailed();
+ channelz_server->RecordCallFailed();
} else {
- channelz_channel->RecordCallSucceeded();
+ channelz_server->RecordCallSucceeded();
}
- } */
+ }
GRPC_ERROR_UNREF(error);
}
}
@@ -1106,14 +1116,17 @@ static void finish_batch_completion(void* user_data,
}
static void reset_batch_errors(batch_control* bctl) {
- GRPC_ERROR_UNREF(bctl->batch_error);
- bctl->batch_error = GRPC_ERROR_NONE;
+ GRPC_ERROR_UNREF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
- grpc_error* error = GRPC_ERROR_REF(bctl->batch_error);
+ grpc_error* error = GRPC_ERROR_REF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@@ -1277,8 +1290,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
- if (bctl->batch_error == GRPC_ERROR_NONE) {
- bctl->batch_error = GRPC_ERROR_REF(error);
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@@ -1384,8 +1399,10 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
call->send_deadline = md->deadline;
}
} else {
- if (bctl->batch_error == GRPC_ERROR_NONE) {
- bctl->batch_error = GRPC_ERROR_REF(error);
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@@ -1435,8 +1452,10 @@ static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
- if (bctl->batch_error == GRPC_ERROR_NONE) {
- bctl->batch_error = GRPC_ERROR_REF(error);
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, GRPC_ERROR_REF(error));
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b3b06059d4..b34260505a 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
typedef struct grpc_call_create_args {
grpc_channel* channel;
+ grpc_server* server;
grpc_call* parent;
uint32_t propagation_mask;
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index e3dde51d69..054fe105c3 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -165,11 +165,12 @@ grpc_channel* grpc_channel_create_with_builder(
}
grpc_channel_args_destroy(args);
- if (channelz_enabled) {
- bool is_top_level_channel = channel->is_client && !internal_channel;
+ // we only need to do the channelz bookkeeping for clients here. The channelz
+ // bookkeeping for server channels occurs in src/core/lib/surface/server.cc
+ if (channelz_enabled && channel->is_client) {
channel->channelz_channel = channel_node_create_func(
- channel, channel_tracer_max_nodes, is_top_level_channel);
- channel->channelz_channel->trace()->AddTraceEvent(
+ channel, channel_tracer_max_nodes, !internal_channel);
+ channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
}
@@ -427,6 +428,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_channel != nullptr) {
+ channel->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_channel->MarkChannelDestroyed();
channel->channelz_channel.reset();
}
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 0769d9e4f6..c2cf450e94 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
+ } else {
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 521825ca69..5fa58ffdec 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -222,6 +222,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -367,6 +369,7 @@ static void server_ref(grpc_server* server) {
static void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
+ server->channelz_server.reset();
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
@@ -796,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport,
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
+ args.server = chand->server;
grpc_call* call;
grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
@@ -960,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server,
}
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server* server =
@@ -976,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->channel_args = grpc_channel_args_copy(args);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
+ if (grpc_channel_arg_get_bool(arg, false)) {
+ arg = grpc_channel_args_find(args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ size_t trace_events_per_node =
+ grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ server->channelz_server =
+ grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
+ trace_events_per_node);
+ server->channelz_server->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Server created"));
+ }
+
return server;
}
@@ -1478,3 +1497,11 @@ int grpc_server_has_open_connections(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
return r;
}
+
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server) {
+ if (server == nullptr) {
+ return nullptr;
+ }
+ return server->channelz_server.get();
+}
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index c617cc223e..0196743ff9 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/transport.h"
@@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args);
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server);
+
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server);
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
index b5268add0d..17e8026096 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
@@ -24,6 +24,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h"
const int kHandshakerClientOpNum = 4;
@@ -109,7 +110,7 @@ static grpc_byte_buffer* get_serialized_start_client(alts_tsi_event* event) {
if (ok) {
buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */);
}
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
gpr_free(target_name);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
@@ -157,7 +158,7 @@ static grpc_byte_buffer* get_serialized_start_server(
if (ok) {
buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
}
- grpc_slice_unref(req_slice);
+ grpc_slice_unref_internal(req_slice);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
}
@@ -195,7 +196,7 @@ static grpc_byte_buffer* get_serialized_next(grpc_slice* bytes_received) {
if (ok) {
buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
}
- grpc_slice_unref(req_slice);
+ grpc_slice_unref_internal(req_slice);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
}
@@ -258,7 +259,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
client->base.vtable = &vtable;
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
return &client->base;
}
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
index e0e4184686..d63d3538c5 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
@@ -20,6 +20,8 @@
#include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h"
+#include "src/core/lib/slice/slice_internal.h"
+
void add_repeated_field(repeated_field** head, const void* data) {
repeated_field* field =
static_cast<repeated_field*>(gpr_zalloc(sizeof(*field)));
@@ -67,7 +69,7 @@ grpc_slice* create_slice(const char* data, size_t size) {
void destroy_slice(grpc_slice* slice) {
if (slice != nullptr) {
- grpc_slice_unref(*slice);
+ grpc_slice_unref_internal(*slice);
gpr_free(slice);
}
}
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
index ec0bf12b95..cb36d5ebd1 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
@@ -24,6 +24,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
+
tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker,
tsi_handshaker_on_next_done_cb cb,
void* user_data,
@@ -66,8 +68,8 @@ void alts_tsi_event_destroy(alts_tsi_event* event) {
grpc_byte_buffer_destroy(event->recv_buffer);
grpc_metadata_array_destroy(&event->initial_metadata);
grpc_metadata_array_destroy(&event->trailing_metadata);
- grpc_slice_unref(event->details);
- grpc_slice_unref(event->target_name);
+ grpc_slice_unref_internal(event->details);
+ grpc_slice_unref_internal(event->target_name);
grpc_alts_credentials_options_destroy(event->options);
gpr_free(event);
}
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
index 1df1021bb1..34608a3de1 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
@@ -31,6 +31,7 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gprpp/thd.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/alts/frame_protector/alts_frame_protector.h"
#include "src/core/tsi/alts/handshaker/alts_handshaker_client.h"
#include "src/core/tsi/alts/handshaker/alts_tsi_utils.h"
@@ -182,7 +183,7 @@ static void handshaker_result_destroy(tsi_handshaker_result* self) {
gpr_free(result->peer_identity);
gpr_free(result->key_data);
gpr_free(result->unused_bytes);
- grpc_slice_unref(result->rpc_versions);
+ grpc_slice_unref_internal(result->rpc_versions);
gpr_free(result);
}
@@ -269,12 +270,12 @@ static tsi_result handshaker_next(
handshaker->has_sent_start_message = true;
} else {
if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) {
- grpc_slice_unref(handshaker->recv_bytes);
+ grpc_slice_unref_internal(handshaker->recv_bytes);
}
handshaker->recv_bytes = grpc_slice_ref(slice);
ok = alts_handshaker_client_next(handshaker->client, event, &slice);
}
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
if (ok != TSI_OK) {
gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests");
return ok;
@@ -299,8 +300,8 @@ static void handshaker_destroy(tsi_handshaker* self) {
alts_tsi_handshaker* handshaker =
reinterpret_cast<alts_tsi_handshaker*>(self);
alts_handshaker_client_destroy(handshaker->client);
- grpc_slice_unref(handshaker->recv_bytes);
- grpc_slice_unref(handshaker->target_name);
+ grpc_slice_unref_internal(handshaker->recv_bytes);
+ grpc_slice_unref_internal(handshaker->target_name);
grpc_alts_credentials_options_destroy(handshaker->options);
gpr_free(handshaker->buffer);
gpr_free(handshaker);
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
index d9b5e6c945..1747f1ad04 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
@@ -22,6 +22,8 @@
#include <grpc/byte_buffer_reader.h>
+#include "src/core/lib/slice/slice_internal.h"
+
tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) {
switch (code) {
case GRPC_STATUS_OK:
@@ -47,7 +49,7 @@ grpc_gcp_handshaker_resp* alts_tsi_utils_deserialize_response(
grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create();
bool ok = grpc_gcp_handshaker_resp_decode(slice, resp);
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
grpc_byte_buffer_reader_destroy(&bbr);
if (!ok) {
grpc_gcp_handshaker_resp_destroy(resp);
diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
index d4fd88d1e2..e7890903d5 100644
--- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
+++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
@@ -61,7 +61,7 @@ static tsi_result alts_grpc_privacy_integrity_protect(
if (status != GRPC_STATUS_OK) {
gpr_log(GPR_ERROR, "Failed to protect, %s", error_details);
gpr_free(error_details);
- grpc_slice_unref(protected_slice);
+ grpc_slice_unref_internal(protected_slice);
return TSI_INTERNAL_ERROR;
}
grpc_slice_buffer_add(protected_slices, protected_slice);
@@ -106,7 +106,7 @@ static tsi_result alts_grpc_privacy_integrity_unprotect(
if (status != GRPC_STATUS_OK) {
gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details);
gpr_free(error_details);
- grpc_slice_unref(unprotected_slice);
+ grpc_slice_unref_internal(unprotected_slice);
return TSI_INTERNAL_ERROR;
}
grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb);
diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
index ce74fde343..f9184bcc34 100644
--- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
+++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
@@ -19,6 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/mutex_lock.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h"
#include "src/core/tsi/ssl/session_cache/ssl_session_cache.h"
@@ -53,7 +54,7 @@ class SslSessionLRUCache::Node {
SetSession(std::move(session));
}
- ~Node() { grpc_slice_unref(key_); }
+ ~Node() { grpc_slice_unref_internal(key_); }
// Not copyable nor movable.
Node(const Node&) = delete;