aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_trace.cc49
-rw-r--r--src/core/lib/channel/channel_trace.h30
-rw-r--r--src/core/lib/channel/channelz.cc185
-rw-r--r--src/core/lib/channel/channelz.h85
-rw-r--r--src/core/lib/channel/connected_channel.cc9
5 files changed, 314 insertions, 44 deletions
diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc
index eb7214b355..0f655d8716 100644
--- a/src/core/lib/channel/channel_trace.cc
+++ b/src/core/lib/channel/channel_trace.cc
@@ -28,7 +28,6 @@
#include <stdlib.h>
#include <string.h>
-#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@@ -40,16 +39,17 @@
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
+namespace channelz {
ChannelTrace::TraceEvent::TraceEvent(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer, ReferencedType type)
+ RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type)
: severity_(severity),
data_(data),
timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
GPR_CLOCK_REALTIME)),
next_(nullptr),
- referenced_tracer_(std::move(referenced_tracer)),
+ referenced_channel_(std::move(referenced_channel)),
referenced_type_(type) {}
ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data)
@@ -62,15 +62,13 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data)
ChannelTrace::TraceEvent::~TraceEvent() { grpc_slice_unref_internal(data_); }
ChannelTrace::ChannelTrace(size_t max_events)
- : channel_uuid_(-1),
- num_events_logged_(0),
+ : num_events_logged_(0),
list_size_(0),
max_list_size_(max_events),
head_trace_(nullptr),
tail_trace_(nullptr) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
gpr_mu_init(&tracer_mu_);
- channel_uuid_ = ChannelzRegistry::Register(this);
time_created_ = grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
GPR_CLOCK_REALTIME);
}
@@ -83,12 +81,9 @@ ChannelTrace::~ChannelTrace() {
it = it->next();
Delete<TraceEvent>(to_free);
}
- ChannelzRegistry::Unregister(channel_uuid_);
gpr_mu_destroy(&tracer_mu_);
}
-intptr_t ChannelTrace::GetUuid() const { return channel_uuid_; }
-
void ChannelTrace::AddTraceEventHelper(TraceEvent* new_trace_event) {
++num_events_logged_;
// first event case
@@ -117,20 +112,21 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) {
void ChannelTrace::AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer) {
+ RefCountedPtr<ChannelNode> referenced_channel) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
- AddTraceEventHelper(
- New<TraceEvent>(severity, data, std::move(referenced_tracer), Channel));
+ AddTraceEventHelper(New<TraceEvent>(
+ severity, data, std::move(referenced_channel), ReferencedType::Channel));
}
void ChannelTrace::AddTraceEventReferencingSubchannel(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer) {
+ RefCountedPtr<ChannelNode> referenced_subchannel) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
- AddTraceEventHelper(New<TraceEvent>(
- severity, data, std::move(referenced_tracer), Subchannel));
+ AddTraceEventHelper(New<TraceEvent>(severity, data,
+ std::move(referenced_subchannel),
+ ReferencedType::Subchannel));
}
namespace {
@@ -193,22 +189,24 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
json_iterator =
grpc_json_create_child(json_iterator, json, "timestamp",
fmt_time(timestamp_), GRPC_JSON_STRING, true);
- if (referenced_tracer_ != nullptr) {
+ if (referenced_channel_ != nullptr) {
char* uuid_str;
- gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_tracer_->channel_uuid_);
+ gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid());
grpc_json* child_ref = grpc_json_create_child(
json_iterator, json,
- (referenced_type_ == Channel) ? "channelRef" : "subchannelRef", nullptr,
- GRPC_JSON_OBJECT, false);
+ (referenced_type_ == ReferencedType::Channel) ? "channelRef"
+ : "subchannelRef",
+ nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_create_child(
nullptr, child_ref,
- (referenced_type_ == Channel) ? "channelId" : "subchannelId", uuid_str,
- GRPC_JSON_STRING, true);
+ (referenced_type_ == ReferencedType::Channel) ? "channelId"
+ : "subchannelId",
+ uuid_str, GRPC_JSON_STRING, true);
json_iterator = child_ref;
}
}
-char* ChannelTrace::RenderTrace() const {
+grpc_json* ChannelTrace::RenderJSON() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
@@ -219,7 +217,7 @@ char* ChannelTrace::RenderTrace() const {
grpc_json_create_child(json_iterator, json, "numEventsLogged",
num_events_logged_str, GRPC_JSON_STRING, true);
json_iterator =
- grpc_json_create_child(json_iterator, json, "creationTime",
+ grpc_json_create_child(json_iterator, json, "creationTimestamp",
fmt_time(time_created_), GRPC_JSON_STRING, true);
grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
nullptr, GRPC_JSON_ARRAY, false);
@@ -231,9 +229,8 @@ char* ChannelTrace::RenderTrace() const {
it->RenderTraceEvent(json_iterator);
it = it->next();
}
- char* json_str = grpc_json_dump_to_string(json, 0);
- grpc_json_destroy(json);
- return json_str;
+ return json;
}
+} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h
index 1df1e585f2..0dd162a777 100644
--- a/src/core/lib/channel/channel_trace.h
+++ b/src/core/lib/channel/channel_trace.h
@@ -28,18 +28,18 @@
#include "src/core/lib/json/json.h"
namespace grpc_core {
+namespace channelz {
+
+class ChannelNode;
// Object used to hold live data for a channel. This data is exposed via the
// channelz service:
// https://github.com/grpc/proposal/blob/master/A14-channelz.md
-class ChannelTrace : public RefCounted<ChannelTrace> {
+class ChannelTrace {
public:
ChannelTrace(size_t max_events);
~ChannelTrace();
- // returns the tracer's uuid
- intptr_t GetUuid() const;
-
enum Severity {
Unset = 0, // never to be used
Info, // we start at 1 to avoid using proto default values
@@ -59,34 +59,30 @@ class ChannelTrace : public RefCounted<ChannelTrace> {
// created a new subchannel, then it would record that with a TraceEvent
// referencing the new subchannel.
//
- // TODO(ncteisen): Once channelz is implemented, the events should reference
- // the overall channelz object, not just the ChannelTrace object.
// TODO(ncteisen): as this call is used more and more throughout the gRPC
// stack, determine if it makes more sense to accept a char* instead of a
// slice.
void AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer);
+ RefCountedPtr<ChannelNode> referenced_channel);
void AddTraceEventReferencingSubchannel(
Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer);
+ RefCountedPtr<ChannelNode> referenced_subchannel);
- // Returns the tracing data rendered as a grpc json string.
- // The string is owned by the caller and must be freed.
- char* RenderTrace() const;
+ // Creates and returns the raw grpc_json object, so a parent channelz
+ // object may incorporate the json before rendering.
+ grpc_json* RenderJSON() const;
private:
// Types of objects that can be references by trace events.
- enum ReferencedType { Channel, Subchannel };
+ enum class ReferencedType { Channel, Subchannel };
// Private class to encapsulate all the data and bookkeeping needed for a
// a trace event.
class TraceEvent {
public:
// Constructor for a TraceEvent that references a different channel.
- // TODO(ncteisen): once channelz is implemented, this should reference the
- // overall channelz object, not just the ChannelTrace object
TraceEvent(Severity severity, grpc_slice data,
- RefCountedPtr<ChannelTrace> referenced_tracer,
+ RefCountedPtr<ChannelNode> referenced_channel,
ReferencedType type);
// Constructor for a TraceEvent that does not reverence a different
@@ -109,7 +105,7 @@ class ChannelTrace : public RefCounted<ChannelTrace> {
gpr_timespec timestamp_;
TraceEvent* next_;
// the tracer object for the (sub)channel that this trace event refers to.
- RefCountedPtr<ChannelTrace> referenced_tracer_;
+ RefCountedPtr<ChannelNode> referenced_channel_;
// the type that the referenced tracer points to. Unused if this trace
// does not point to any channel or subchannel
ReferencedType referenced_type_;
@@ -119,7 +115,6 @@ class ChannelTrace : public RefCounted<ChannelTrace> {
void AddTraceEventHelper(TraceEvent* new_trace_event);
gpr_mu tracer_mu_;
- intptr_t channel_uuid_;
uint64_t num_events_logged_;
size_t list_size_;
size_t max_list_size_;
@@ -128,6 +123,7 @@ class ChannelTrace : public RefCounted<ChannelTrace> {
gpr_timespec time_created_;
};
+} // namespace channelz
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_H */
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
new file mode 100644
index 0000000000..3550fc0551
--- /dev/null
+++ b/src/core/lib/channel/channelz.cc
@@ -0,0 +1,185 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include "src/core/lib/channel/channelz.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/lib/channel/channelz_registry.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+
+namespace grpc_core {
+namespace channelz {
+
+namespace {
+
+// TODO(ncteisen): move this function to a common helper location.
+//
+// returns an allocated string that represents tm according to RFC-3339, and,
+// more specifically, follows:
+// https://developers.google.com/protocol-buffers/docs/proto3#json
+//
+// "Uses RFC 3339, where generated output will always be Z-normalized and uses
+// 0, 3, 6 or 9 fractional digits."
+char* fmt_time(gpr_timespec tm) {
+ char time_buffer[35];
+ char ns_buffer[11]; // '.' + 9 digits of precision
+ struct tm* tm_info = localtime((const time_t*)&tm.tv_sec);
+ strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info);
+ snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec);
+ // This loop trims off trailing zeros by inserting a null character that the
+ // right point. We iterate in chunks of three because we want 0, 3, 6, or 9
+ // fractional digits.
+ for (int i = 7; i >= 1; i -= 3) {
+ if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' &&
+ ns_buffer[i + 2] == '0') {
+ ns_buffer[i] = '\0';
+ // Edge case in which all fractional digits were 0.
+ if (i == 1) {
+ ns_buffer[0] = '\0';
+ }
+ } else {
+ break;
+ }
+ }
+ char* full_time_str;
+ gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer);
+ return full_time_str;
+}
+
+// TODO(ncteisen); move this to json library
+grpc_json* add_num_str(grpc_json* parent, grpc_json* it, const char* name,
+ int64_t num) {
+ char* num_str;
+ gpr_asprintf(&num_str, "%" PRId64, num);
+ return grpc_json_create_child(it, parent, name, num_str, GRPC_JSON_STRING,
+ true);
+}
+
+} // namespace
+
+ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes)
+ : channel_(channel), target_(nullptr), channel_uuid_(-1) {
+ trace_.Init(channel_tracer_max_nodes);
+ target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
+ channel_uuid_ = ChannelzRegistry::Register(this);
+ gpr_atm_no_barrier_store(&last_call_started_millis_,
+ (gpr_atm)ExecCtx::Get()->Now());
+}
+
+ChannelNode::~ChannelNode() {
+ trace_.Destroy();
+ ChannelzRegistry::Unregister(channel_uuid_);
+}
+
+void ChannelNode::RecordCallStarted() {
+ gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
+ gpr_atm_no_barrier_store(&last_call_started_millis_,
+ (gpr_atm)ExecCtx::Get()->Now());
+}
+
+grpc_connectivity_state ChannelNode::GetConnectivityState() {
+ if (channel_ == nullptr) {
+ return GRPC_CHANNEL_SHUTDOWN;
+ } else {
+ return grpc_channel_check_connectivity_state(channel_, false);
+ }
+}
+
+char* ChannelNode::RenderJSON() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = add_num_str(json, json_iterator, "channelId", channel_uuid_);
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ // create and fill the connectivity state child.
+ grpc_connectivity_state connectivity_state = GetConnectivityState();
+ json_iterator = grpc_json_create_child(json_iterator, json, "state", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ grpc_json_create_child(nullptr, json, "state",
+ grpc_connectivity_state_name(connectivity_state),
+ GRPC_JSON_STRING, false);
+ // reset the parent to be the data object.
+ json = data;
+ json_iterator = grpc_json_create_child(
+ json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
+ // fill in the channel trace if applicable
+ grpc_json* trace = trace_->RenderJSON();
+ if (trace != nullptr) {
+ // we manuall link up and fill the child since it was created for us in
+ // ChannelTrace::RenderJSON
+ json_iterator = grpc_json_link_child(json, trace, json_iterator);
+ trace->parent = json;
+ trace->value = nullptr;
+ trace->key = "trace";
+ trace->owns_value = false;
+ }
+ // reset the parent to be the data object.
+ json = data;
+ json_iterator = nullptr;
+ // We use -1 as sentinel values since proto default value for integers is
+ // zero, and the confuses the parser into thinking the value weren't present
+ json_iterator =
+ add_num_str(json, json_iterator, "callsStarted", calls_started_);
+ json_iterator =
+ add_num_str(json, json_iterator, "callsSucceeded", calls_succeeded_);
+ json_iterator =
+ add_num_str(json, json_iterator, "callsFailed", calls_failed_);
+ gpr_timespec ts =
+ grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
+ json_iterator =
+ grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
+ fmt_time(ts), GRPC_JSON_STRING, true);
+ // render and return the over json object
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
+
+} // namespace channelz
+} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
new file mode 100644
index 0000000000..2aad1e82f4
--- /dev/null
+++ b/src/core/lib/channel/channelz.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CHANNELZ_H
+#define GRPC_CORE_LIB_CHANNEL_CHANNELZ_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpc/grpc.h>
+
+#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/json/json.h"
+
+namespace grpc_core {
+namespace channelz {
+
+namespace testing {
+class ChannelNodePeer;
+}
+
+class ChannelNode : public RefCounted<ChannelNode> {
+ public:
+ ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes);
+ ~ChannelNode();
+
+ void RecordCallStarted();
+ void RecordCallFailed() {
+ gpr_atm_no_barrier_fetch_add(&calls_failed_, (gpr_atm(1)));
+ }
+ void RecordCallSucceeded() {
+ gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
+ }
+
+ char* RenderJSON();
+
+ ChannelTrace* trace() { return trace_.get(); }
+
+ void set_channel_destroyed() {
+ GPR_ASSERT(channel_ != nullptr);
+ channel_ = nullptr;
+ }
+
+ intptr_t channel_uuid() { return channel_uuid_; }
+
+ private:
+ // testing peer friend.
+ friend class testing::ChannelNodePeer;
+
+ // helper for getting connectivity state.
+ grpc_connectivity_state GetConnectivityState();
+
+ grpc_channel* channel_ = nullptr;
+ UniquePtr<char> target_;
+ gpr_atm calls_started_ = 0;
+ gpr_atm calls_succeeded_ = 0;
+ gpr_atm calls_failed_ = 0;
+ gpr_atm last_call_started_millis_ = 0;
+ intptr_t channel_uuid_;
+ ManualConstructor<ChannelTrace> trace_;
+};
+
+} // namespace channelz
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CHANNELZ_H */
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029402..e2ea334ded 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
+ callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
+ if (batch->recv_trailing_metadata) {
+ callback_state* state = &calld->recv_trailing_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_trailing_metadata_ready",
+ &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+ }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else {
+ } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}