aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/ext
diff options
context:
space:
mode:
authorGravatar Vizerai <jsking@google.com>2018-04-13 18:19:21 -0700
committerGravatar Vizerai <jsking@google.com>2018-05-10 11:41:15 -0700
commit41e4cedb7012a55376322b142d74eae5e86b95e3 (patch)
treee88e094d1381ab459e25f12328d20a2945b09660 /src/cpp/ext
parent082ddc563ea71a6b7a700070ec60095bfb65d88f (diff)
Adding opencensus grpc plugin.
Rebasing to merge commits.
Diffstat (limited to 'src/cpp/ext')
-rw-r--r--src/cpp/ext/filters/census/channel_filter.cc30
-rw-r--r--src/cpp/ext/filters/census/channel_filter.h36
-rw-r--r--src/cpp/ext/filters/census/client_filter.cc163
-rw-r--r--src/cpp/ext/filters/census/client_filter.h104
-rw-r--r--src/cpp/ext/filters/census/context.cc132
-rw-r--r--src/cpp/ext/filters/census/context.h126
-rw-r--r--src/cpp/ext/filters/census/grpc_context.cc38
-rw-r--r--src/cpp/ext/filters/census/grpc_plugin.cc130
-rw-r--r--src/cpp/ext/filters/census/grpc_plugin.h125
-rw-r--r--src/cpp/ext/filters/census/measures.cc129
-rw-r--r--src/cpp/ext/filters/census/measures.h46
-rw-r--r--src/cpp/ext/filters/census/rpc_encoding.cc39
-rw-r--r--src/cpp/ext/filters/census/rpc_encoding.h284
-rw-r--r--src/cpp/ext/filters/census/server_filter.cc198
-rw-r--r--src/cpp/ext/filters/census/server_filter.h101
-rw-r--r--src/cpp/ext/filters/census/views.cc491
16 files changed, 2172 insertions, 0 deletions
diff --git a/src/cpp/ext/filters/census/channel_filter.cc b/src/cpp/ext/filters/census/channel_filter.cc
new file mode 100644
index 0000000000..4ac684d277
--- /dev/null
+++ b/src/cpp/ext/filters/census/channel_filter.cc
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/channel_filter.h"
+
+namespace grpc {
+
+grpc_error* CensusChannelData::Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ return GRPC_ERROR_NONE;
+}
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/channel_filter.h b/src/cpp/ext/filters/census/channel_filter.h
new file mode 100644
index 0000000000..0b7c682681
--- /dev/null
+++ b/src/cpp/ext/filters/census/channel_filter.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/context.h"
+
+namespace grpc {
+
+class CensusChannelData : public ChannelData {
+ public:
+ grpc_error* Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args) override;
+};
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H */
diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc
new file mode 100644
index 0000000000..293f4b1c25
--- /dev/null
+++ b/src/cpp/ext/filters/census/client_filter.cc
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/client_filter.h"
+
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "opencensus/stats/stats.h"
+#include "src/core/lib/surface/call.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+#include "src/cpp/ext/filters/census/measures.h"
+
+namespace grpc {
+
+constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
+constexpr uint32_t CensusClientCallData::kMaxTagsLen;
+
+namespace {
+
+void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
+ if (b->idx.named.grpc_server_stats_bin != nullptr) {
+ ServerStatsDeserialize(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
+ GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
+ GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
+ elapsed_time);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
+ }
+}
+
+} // namespace
+
+void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusClientCallData* calld =
+ reinterpret_cast<CensusClientCallData*>(elem->call_data);
+ GPR_ASSERT(calld != nullptr);
+ if (error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
+ FilterTrailingMetadata(calld->recv_trailing_metadata_,
+ &calld->elapsed_time_);
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
+ GRPC_ERROR_REF(error));
+}
+
+void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusClientCallData* calld =
+ reinterpret_cast<CensusClientCallData*>(elem->call_data);
+ CensusChannelData* channeld =
+ reinterpret_cast<CensusChannelData*>(elem->channel_data);
+ GPR_ASSERT(calld != nullptr);
+ GPR_ASSERT(channeld != nullptr);
+ // Stream messages are no longer valid after receiving trailing metadata.
+ if ((*calld->recv_message_) != nullptr) {
+ calld->recv_message_count_++;
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
+}
+
+void CensusClientCallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, TransportStreamOpBatch* op) {
+ if (op->send_initial_metadata() != nullptr) {
+ census_context* ctxt = op->get_census_context();
+ GenerateClientContext(
+ qualified_method_, &context_,
+ (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
+ size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
+ kMaxTraceContextLen);
+ if (tracing_len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata()->batch(), &tracing_bin_,
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_TRACE_BIN,
+ grpc_slice_from_copied_buffer(tracing_buf_, tracing_len))));
+ }
+ grpc_slice tags = grpc_empty_slice();
+ // TODO: Add in tagging serialization.
+ size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
+ if (encoded_tags_len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata()->batch(), &stats_bin_,
+ grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
+ }
+ }
+
+ if (op->send_message() != nullptr) {
+ ++sent_message_count_;
+ }
+ if (op->recv_message() != nullptr) {
+ recv_message_ = op->op()->payload->recv_message.recv_message;
+ initial_on_done_recv_message_ =
+ op->op()->payload->recv_message.recv_message_ready;
+ op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
+ }
+ if (op->recv_trailing_metadata() != nullptr) {
+ recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
+ initial_on_done_recv_trailing_metadata_ = op->on_complete();
+ op->set_on_complete(&on_done_recv_trailing_metadata_);
+ }
+ // Call next op.
+ grpc_call_next_op(elem, op->op());
+}
+
+grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ path_ = grpc_slice_ref_internal(args->path);
+ start_time_ = absl::Now();
+ method_ = GetMethod(&path_);
+ qualified_method_ = absl::StrCat("Sent.", method_);
+ GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
+ OnDoneRecvTrailingMetadataCb, elem,
+ grpc_schedule_on_exec_ctx);
+ return GRPC_ERROR_NONE;
+}
+
+void CensusClientCallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) {
+ const uint64_t request_size = GetOutgoingDataSize(final_info);
+ const uint64_t response_size = GetIncomingDataSize(final_info);
+ double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
+ ::opencensus::stats::Record(
+ {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
+ {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
+ {RpcClientRoundtripLatency(), latency_ms},
+ {RpcClientServerLatency(),
+ ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
+ {RpcClientSentMessagesPerRpc(), sent_message_count_},
+ {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
+ {{ClientMethodTagKey(), method_},
+ {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}});
+ grpc_slice_unref_internal(path_);
+ context_.EndSpan();
+}
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h
new file mode 100644
index 0000000000..851022873f
--- /dev/null
+++ b/src/cpp/ext/filters/census/client_filter.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "src/cpp/ext/filters/census/channel_filter.h"
+#include "src/cpp/ext/filters/census/context.h"
+
+namespace grpc {
+
+// A CallData class will be created for every grpc call within a channel. It is
+// used to store data and methods specific to that call. CensusClientCallData is
+// thread-compatible, however typically only 1 thread should be interacting with
+// a call at a time.
+class CensusClientCallData : public CallData {
+ public:
+ // Maximum size of trace context is sent on the wire.
+ static constexpr uint32_t kMaxTraceContextLen = 64;
+ // Maximum size of tags that are sent on the wire.
+ static constexpr uint32_t kMaxTagsLen = 2048;
+
+ CensusClientCallData()
+ : recv_trailing_metadata_(nullptr),
+ initial_on_done_recv_trailing_metadata_(nullptr),
+ initial_on_done_recv_message_(nullptr),
+ elapsed_time_(0),
+ recv_message_(nullptr),
+ recv_message_count_(0),
+ sent_message_count_(0) {
+ memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
+ memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
+ memset(&path_, 0, sizeof(grpc_slice));
+ memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure));
+ memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
+ }
+
+ grpc_error* Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) override;
+
+ void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) override;
+
+ void StartTransportStreamOpBatch(grpc_call_element* elem,
+ TransportStreamOpBatch* op) override;
+
+ static void OnDoneRecvTrailingMetadataCb(void* user_data, grpc_error* error);
+
+ static void OnDoneSendInitialMetadataCb(void* user_data, grpc_error* error);
+
+ static void OnDoneRecvMessageCb(void* user_data, grpc_error* error);
+
+ private:
+ CensusContext context_;
+ // Metadata elements for tracing and census stats data.
+ grpc_linked_mdelem stats_bin_;
+ grpc_linked_mdelem tracing_bin_;
+ // Client method.
+ absl::string_view method_;
+ std::string qualified_method_;
+ grpc_slice path_;
+ // The recv trailing metadata callbacks.
+ grpc_metadata_batch* recv_trailing_metadata_;
+ grpc_closure* initial_on_done_recv_trailing_metadata_;
+ grpc_closure on_done_recv_trailing_metadata_;
+ // recv message
+ grpc_closure* initial_on_done_recv_message_;
+ grpc_closure on_done_recv_message_;
+ // Start time (for measuring latency).
+ absl::Time start_time_;
+ // Server elapsed time in nanoseconds.
+ uint64_t elapsed_time_;
+ // The received message--may be null.
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_;
+ // Number of messages in this RPC.
+ uint64_t recv_message_count_;
+ uint64_t sent_message_count_;
+ // Buffer needed for grpc_slice to reference when adding trace context
+ // metatdata to outgoing message.
+ char tracing_buf_[kMaxTraceContextLen];
+};
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H */
diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc
new file mode 100644
index 0000000000..4b3250236d
--- /dev/null
+++ b/src/cpp/ext/filters/census/context.cc
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/context.h"
+
+namespace grpc {
+
+using ::opencensus::trace::Span;
+using ::opencensus::trace::SpanContext;
+
+void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
+ absl::string_view primary_role,
+ absl::string_view method, CensusContext* context) {
+ GrpcTraceContext trace_ctxt;
+ TraceContextEncoding::Decode(tracing, &trace_ctxt);
+ SpanContext parent_ctx = trace_ctxt.ToSpanContext();
+ new (context) CensusContext(method, parent_ctx);
+}
+
+void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
+ CensusContext* parent_ctxt) {
+ if (parent_ctxt != nullptr) {
+ SpanContext span_ctxt = parent_ctxt->Context();
+ Span span = parent_ctxt->Span();
+ if (span_ctxt.IsValid()) {
+ new (ctxt) CensusContext(method, &span);
+ return;
+ }
+ }
+ new (ctxt) CensusContext(method);
+}
+
+size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,
+ char* tracing_buf, size_t tracing_buf_size) {
+ GrpcTraceContext trace_ctxt(context);
+ return TraceContextEncoding::Encode(trace_ctxt, tracing_buf,
+ tracing_buf_size);
+}
+
+size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags) {
+ // TODO: Add implementation. Waiting on stats tagging to be added.
+ return 0;
+}
+
+size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
+ size_t buf_size) {
+ return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size);
+}
+
+size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
+ uint64_t* server_elapsed_time) {
+ return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size),
+ server_elapsed_time);
+}
+
+uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) {
+ return final_info->stats.transport_stream_stats.incoming.data_bytes;
+}
+
+uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) {
+ return final_info->stats.transport_stream_stats.outgoing.data_bytes;
+}
+
+SpanContext SpanContextFromCensusContext(const census_context* ctxt) {
+ return reinterpret_cast<const CensusContext*>(ctxt)->Context();
+}
+
+Span SpanFromCensusContext(const census_context* ctxt) {
+ return reinterpret_cast<const CensusContext*>(ctxt)->Span();
+}
+
+absl::string_view StatusCodeToString(grpc_status_code code) {
+ switch (code) {
+ case GRPC_STATUS_OK:
+ return "OK";
+ case GRPC_STATUS_CANCELLED:
+ return "CANCELLED";
+ case GRPC_STATUS_UNKNOWN:
+ return "UNKNOWN";
+ case GRPC_STATUS_INVALID_ARGUMENT:
+ return "INVALID_ARGUMENT";
+ case GRPC_STATUS_DEADLINE_EXCEEDED:
+ return "DEADLINE_EXCEEDED";
+ case GRPC_STATUS_NOT_FOUND:
+ return "NOT_FOUND";
+ case GRPC_STATUS_ALREADY_EXISTS:
+ return "ALREADY_EXISTS";
+ case GRPC_STATUS_PERMISSION_DENIED:
+ return "PERMISSION_DENIED";
+ case GRPC_STATUS_UNAUTHENTICATED:
+ return "UNAUTHENTICATED";
+ case GRPC_STATUS_RESOURCE_EXHAUSTED:
+ return "RESOURCE_EXHAUSTED";
+ case GRPC_STATUS_FAILED_PRECONDITION:
+ return "FAILED_PRECONDITION";
+ case GRPC_STATUS_ABORTED:
+ return "ABORTED";
+ case GRPC_STATUS_OUT_OF_RANGE:
+ return "OUT_OF_RANGE";
+ case GRPC_STATUS_UNIMPLEMENTED:
+ return "UNIMPLEMENTED";
+ case GRPC_STATUS_INTERNAL:
+ return "INTERNAL";
+ case GRPC_STATUS_UNAVAILABLE:
+ return "UNAVAILABLE";
+ case GRPC_STATUS_DATA_LOSS:
+ return "DATA_LOSS";
+ default:
+ // gRPC wants users of this enum to include a default branch so that
+ // adding values is not a breaking change.
+ return "UNKNOWN_STATUS";
+ }
+}
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/context.h b/src/cpp/ext/filters/census/context.h
new file mode 100644
index 0000000000..1643fdd11b
--- /dev/null
+++ b/src/cpp/ext/filters/census/context.h
@@ -0,0 +1,126 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/status.h>
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "absl/strings/strip.h"
+#include "opencensus/trace/span.h"
+#include "opencensus/trace/span_context.h"
+#include "opencensus/trace/trace_params.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/cpp/common/channel_filter.h"
+#include "src/cpp/ext/filters/census/rpc_encoding.h"
+
+// This is needed because grpc has hardcoded CensusContext with a
+// forward declaration of 'struct census_context;'
+struct census_context;
+
+namespace grpc {
+
+// Thread compatible.
+class CensusContext {
+ public:
+ CensusContext() : span_(::opencensus::trace::Span::BlankSpan()) {}
+
+ explicit CensusContext(absl::string_view name)
+ : span_(::opencensus::trace::Span::StartSpan(name)) {}
+
+ CensusContext(absl::string_view name, const ::opencensus::trace::Span* parent)
+ : span_(::opencensus::trace::Span::StartSpan(name, parent)) {}
+
+ CensusContext(absl::string_view name,
+ const ::opencensus::trace::SpanContext& parent_ctxt)
+ : span_(::opencensus::trace::Span::StartSpanWithRemoteParent(
+ name, parent_ctxt)) {}
+
+ ::opencensus::trace::SpanContext Context() const { return span_.context(); }
+ ::opencensus::trace::Span Span() const { return span_; }
+ void EndSpan() { span_.End(); }
+
+ private:
+ ::opencensus::trace::Span span_;
+};
+
+// Serializes the outgoing trace context. Field IDs are 1 byte followed by
+// field data. A 1 byte version ID is always encoded first.
+size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,
+ char* tracing_buf, size_t tracing_buf_size);
+
+// Serializes the outgoing stats context. Field IDs are 1 byte followed by
+// field data. A 1 byte version ID is always encoded first. Tags are directly
+// serialized into the given grpc_slice.
+size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags);
+
+// Serialize outgoing server stats. Returns the number of bytes serialized.
+size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
+ size_t buf_size);
+
+// Deserialize incoming server stats. Returns the number of bytes deserialized.
+size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
+ uint64_t* server_elapsed_time);
+
+// Deserialize the incoming SpanContext and generate a new server context based
+// on that. This new span will never be a root span. This should only be called
+// with a blank CensusContext as it overwrites it.
+void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
+ absl::string_view primary_role,
+ absl::string_view method, CensusContext* context);
+
+// Creates a new client context that is by default a new root context.
+// If the current context is the default context then the newly created
+// span automatically becomes a root span. This should only be called with a
+// blank CensusContext as it overwrites it.
+void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
+ CensusContext* parent_ctx);
+
+// Returns the incoming data size from the grpc call final info.
+uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info);
+
+// Returns the outgoing data size from the grpc call final info.
+uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info);
+
+// These helper functions return the SpanContext and Span, respectively
+// associated with the census_context* stored by grpc. The user will need to
+// call this for manual propagation of tracing data.
+::opencensus::trace::SpanContext SpanContextFromCensusContext(
+ const census_context* ctxt);
+::opencensus::trace::Span SpanFromCensusContext(const census_context* ctxt);
+
+// Returns a string representation of the StatusCode enum.
+absl::string_view StatusCodeToString(grpc_status_code code);
+
+inline absl::string_view GetMethod(const grpc_slice* path) {
+ if (GRPC_SLICE_IS_EMPTY(*path)) {
+ return "";
+ }
+ // Check for leading '/' and trim it if present.
+ return absl::StripPrefix(absl::string_view(reinterpret_cast<const char*>(
+ GRPC_SLICE_START_PTR(*path)),
+ GRPC_SLICE_LENGTH(*path)),
+ "/");
+}
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H */
diff --git a/src/cpp/ext/filters/census/grpc_context.cc b/src/cpp/ext/filters/census/grpc_context.cc
new file mode 100644
index 0000000000..599a798dda
--- /dev/null
+++ b/src/cpp/ext/filters/census/grpc_context.cc
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/census.h>
+#include <grpc/grpc.h>
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/call.h"
+
+void grpc_census_call_set_context(grpc_call* call, census_context* context) {
+ GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2,
+ (call, context));
+ if (context != nullptr) {
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, nullptr);
+ }
+}
+
+census_context* grpc_census_call_get_context(grpc_call* call) {
+ GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call));
+ return static_cast<census_context*>(
+ grpc_call_context_get(call, GRPC_CONTEXT_TRACING));
+}
diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc
new file mode 100644
index 0000000000..f978ed3bf5
--- /dev/null
+++ b/src/cpp/ext/filters/census/grpc_plugin.cc
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+
+#include <grpcpp/server_context.h>
+
+#include "opencensus/trace/span.h"
+#include "src/cpp/ext/filters/census/channel_filter.h"
+#include "src/cpp/ext/filters/census/client_filter.h"
+#include "src/cpp/ext/filters/census/measures.h"
+#include "src/cpp/ext/filters/census/server_filter.h"
+
+namespace grpc {
+
+void RegisterOpenCensusPlugin() {
+ RegisterChannelFilter<CensusChannelData, CensusClientCallData>(
+ "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */,
+ nullptr /* condition function */);
+ RegisterChannelFilter<CensusChannelData, CensusServerCallData>(
+ "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */,
+ nullptr /* condition function */);
+
+ // Access measures to ensure they are initialized. Otherwise, creating a view
+ // before the first RPC would cause an error.
+ RpcClientSentBytesPerRpc();
+ RpcClientReceivedBytesPerRpc();
+ RpcClientRoundtripLatency();
+ RpcClientServerLatency();
+ RpcClientSentMessagesPerRpc();
+ RpcClientReceivedMessagesPerRpc();
+
+ RpcServerSentBytesPerRpc();
+ RpcServerReceivedBytesPerRpc();
+ RpcServerServerLatency();
+ RpcServerSentMessagesPerRpc();
+ RpcServerReceivedMessagesPerRpc();
+}
+
+::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context) {
+ return reinterpret_cast<const CensusContext*>(context->census_context())
+ ->Span();
+}
+
+// These measure definitions should be kept in sync across opencensus
+// implementations--see
+// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java.
+::opencensus::stats::TagKey ClientMethodTagKey() {
+ static const auto method_tag_key =
+ ::opencensus::stats::TagKey::Register("grpc_client_method");
+ return method_tag_key;
+}
+
+::opencensus::stats::TagKey ClientStatusTagKey() {
+ static const auto status_tag_key =
+ ::opencensus::stats::TagKey::Register("grpc_client_status");
+ return status_tag_key;
+}
+
+::opencensus::stats::TagKey ServerMethodTagKey() {
+ static const auto method_tag_key =
+ ::opencensus::stats::TagKey::Register("grpc_server_method");
+ return method_tag_key;
+}
+
+::opencensus::stats::TagKey ServerStatusTagKey() {
+ static const auto status_tag_key =
+ ::opencensus::stats::TagKey::Register("grpc_server_status");
+ return status_tag_key;
+}
+
+// Client
+ABSL_CONST_INIT const absl::string_view
+ kRpcClientSentMessagesPerRpcMeasureName =
+ "grpc.io/client/sent_messages_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view kRpcClientSentBytesPerRpcMeasureName =
+ "grpc.io/client/sent_bytes_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view
+ kRpcClientReceivedMessagesPerRpcMeasureName =
+ "grpc.io/client/received_messages_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view
+ kRpcClientReceivedBytesPerRpcMeasureName =
+ "grpc.io/client/received_bytes_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName =
+ "grpc.io/client/roundtrip_latency";
+
+ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName =
+ "grpc.io/client/server_latency";
+
+// Server
+ABSL_CONST_INIT const absl::string_view
+ kRpcServerSentMessagesPerRpcMeasureName =
+ "grpc.io/server/sent_messages_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view kRpcServerSentBytesPerRpcMeasureName =
+ "grpc.io/server/sent_bytes_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view
+ kRpcServerReceivedMessagesPerRpcMeasureName =
+ "grpc.io/server/received_messages_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view
+ kRpcServerReceivedBytesPerRpcMeasureName =
+ "grpc.io/server/received_bytes_per_rpc";
+
+ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName =
+ "grpc.io/server/server_latency";
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h
new file mode 100644
index 0000000000..107db18de9
--- /dev/null
+++ b/src/cpp/ext/filters/census/grpc_plugin.h
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H
+
+#include <grpc/support/port_platform.h>
+
+#include "absl/strings/string_view.h"
+#include "opencensus/stats/stats.h"
+#include "opencensus/trace/span.h"
+
+namespace grpc {
+class ServerContext;
+}
+
+namespace grpc {
+
+// Registers the OpenCensus plugin with gRPC, so that it will be used for future
+// RPCs. This must be called before any views are created on the measures
+// defined below.
+void RegisterOpenCensusPlugin();
+
+// RPC stats definitions, defined by
+// https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md
+
+// Registers the cumulative gRPC views so that they will be exported by any
+// registered stats exporter.
+// For on-task stats, construct a View using the ViewDescriptors below.
+void RegisterGrpcViewsForExport();
+
+// Returns the tracing Span for the current RPC.
+::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context);
+
+// The tag keys set when recording RPC stats.
+::opencensus::stats::TagKey ClientMethodTagKey();
+::opencensus::stats::TagKey ClientStatusTagKey();
+::opencensus::stats::TagKey ServerMethodTagKey();
+::opencensus::stats::TagKey ServerStatusTagKey();
+
+// Names of measures used by the plugin--users can create views on these
+// measures but should not record data for them.
+extern const absl::string_view kRpcClientSentMessagesPerRpcMeasureName;
+extern const absl::string_view kRpcClientSentBytesPerRpcMeasureName;
+extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName;
+extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName;
+extern const absl::string_view kRpcClientRoundtripLatencyMeasureName;
+extern const absl::string_view kRpcClientServerLatencyMeasureName;
+
+extern const absl::string_view kRpcServerSentMessagesPerRpcMeasureName;
+extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName;
+extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName;
+extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName;
+extern const absl::string_view kRpcServerServerLatencyMeasureName;
+
+// Canonical gRPC view definitions.
+const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor&
+ClientReceivedMessagesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor&
+ClientReceivedBytesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative();
+const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative();
+const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative();
+
+const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor&
+ServerReceivedBytesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative();
+const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative();
+const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative();
+const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative();
+const ::opencensus::stats::ViewDescriptor&
+ServerReceivedMessagesPerRpcCumulative();
+
+const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute();
+const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute();
+const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute();
+
+const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute();
+const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute();
+const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute();
+
+const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour();
+const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour();
+const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour();
+
+const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour();
+const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour();
+const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour();
+const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour();
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H */
diff --git a/src/cpp/ext/filters/census/measures.cc b/src/cpp/ext/filters/census/measures.cc
new file mode 100644
index 0000000000..b522fae09a
--- /dev/null
+++ b/src/cpp/ext/filters/census/measures.cc
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/measures.h"
+
+#include "opencensus/stats/stats.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+
+namespace grpc {
+
+using ::opencensus::stats::MeasureDouble;
+using ::opencensus::stats::MeasureInt64;
+
+// These measure definitions should be kept in sync across opencensus
+// implementations--see
+// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java.
+
+namespace {
+
+// Unit constants
+constexpr char kUnitBytes[] = "By";
+constexpr char kUnitMilliseconds[] = "ms";
+constexpr char kCount[] = "1";
+
+} // namespace
+
+// Client
+MeasureDouble RpcClientSentBytesPerRpc() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcClientSentBytesPerRpcMeasureName,
+ "Total bytes sent across all request messages per RPC", kUnitBytes);
+ return measure;
+}
+
+MeasureDouble RpcClientReceivedBytesPerRpc() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcClientReceivedBytesPerRpcMeasureName,
+ "Total bytes received across all response messages per RPC", kUnitBytes);
+ return measure;
+}
+
+MeasureDouble RpcClientRoundtripLatency() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcClientRoundtripLatencyMeasureName,
+ "Time between first byte of request sent to last byte of response "
+ "received, or terminal error",
+ kUnitMilliseconds);
+ return measure;
+}
+
+MeasureDouble RpcClientServerLatency() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcClientServerLatencyMeasureName,
+ "Time between first byte of request received to last byte of response "
+ "sent, or terminal error (propagated from the server)",
+ kUnitMilliseconds);
+ return measure;
+}
+
+MeasureInt64 RpcClientSentMessagesPerRpc() {
+ static const auto measure =
+ MeasureInt64::Register(kRpcClientSentMessagesPerRpcMeasureName,
+ "Number of messages sent per RPC", kCount);
+ return measure;
+}
+
+MeasureInt64 RpcClientReceivedMessagesPerRpc() {
+ static const auto measure =
+ MeasureInt64::Register(kRpcClientReceivedMessagesPerRpcMeasureName,
+ "Number of messages received per RPC", kCount);
+ return measure;
+}
+
+// Server
+MeasureDouble RpcServerSentBytesPerRpc() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcServerSentBytesPerRpcMeasureName,
+ "Total bytes sent across all messages per RPC", kUnitBytes);
+ return measure;
+}
+
+MeasureDouble RpcServerReceivedBytesPerRpc() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcServerReceivedBytesPerRpcMeasureName,
+ "Total bytes received across all messages per RPC", kUnitBytes);
+ return measure;
+}
+
+MeasureDouble RpcServerServerLatency() {
+ static const auto measure = MeasureDouble::Register(
+ kRpcServerServerLatencyMeasureName,
+ "Time between first byte of request received to last byte of response "
+ "sent, or terminal error",
+ kUnitMilliseconds);
+ return measure;
+}
+
+MeasureInt64 RpcServerSentMessagesPerRpc() {
+ static const auto measure =
+ MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName,
+ "Number of messages sent per RPC", kCount);
+ return measure;
+}
+
+MeasureInt64 RpcServerReceivedMessagesPerRpc() {
+ static const auto measure =
+ MeasureInt64::Register(kRpcServerReceivedMessagesPerRpcMeasureName,
+ "Number of messages received per RPC", kCount);
+ return measure;
+}
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/measures.h b/src/cpp/ext/filters/census/measures.h
new file mode 100644
index 0000000000..8f8e72ace2
--- /dev/null
+++ b/src/cpp/ext/filters/census/measures.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H
+
+#include <grpc/support/port_platform.h>
+
+#include "opencensus/stats/stats.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+
+namespace grpc {
+
+::opencensus::stats::MeasureInt64 RpcClientSentMessagesPerRpc();
+::opencensus::stats::MeasureDouble RpcClientSentBytesPerRpc();
+::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc();
+::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc();
+::opencensus::stats::MeasureDouble RpcClientRoundtripLatency();
+::opencensus::stats::MeasureDouble RpcClientServerLatency();
+::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs();
+
+::opencensus::stats::MeasureInt64 RpcServerSentMessagesPerRpc();
+::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc();
+::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc();
+::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc();
+::opencensus::stats::MeasureDouble RpcServerServerLatency();
+::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs();
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H */
diff --git a/src/cpp/ext/filters/census/rpc_encoding.cc b/src/cpp/ext/filters/census/rpc_encoding.cc
new file mode 100644
index 0000000000..45a66d9dc8
--- /dev/null
+++ b/src/cpp/ext/filters/census/rpc_encoding.cc
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/rpc_encoding.h"
+
+namespace grpc {
+
+constexpr size_t TraceContextEncoding::kGrpcTraceContextSize;
+constexpr size_t TraceContextEncoding::kEncodeDecodeFailure;
+constexpr size_t TraceContextEncoding::kVersionIdSize;
+constexpr size_t TraceContextEncoding::kFieldIdSize;
+constexpr size_t TraceContextEncoding::kVersionIdOffset;
+constexpr size_t TraceContextEncoding::kVersionId;
+
+constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize;
+constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure;
+constexpr size_t RpcServerStatsEncoding::kVersionIdSize;
+constexpr size_t RpcServerStatsEncoding::kFieldIdSize;
+constexpr size_t RpcServerStatsEncoding::kVersionIdOffset;
+constexpr size_t RpcServerStatsEncoding::kVersionId;
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/rpc_encoding.h b/src/cpp/ext/filters/census/rpc_encoding.h
new file mode 100644
index 0000000000..ffffa60c46
--- /dev/null
+++ b/src/cpp/ext/filters/census/rpc_encoding.h
@@ -0,0 +1,284 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H
+
+#include <grpc/support/port_platform.h>
+
+#include <string.h>
+
+#include "absl/base/internal/endian.h"
+#include "absl/strings/string_view.h"
+#include "opencensus/trace/span_context.h"
+#include "opencensus/trace/span_id.h"
+#include "opencensus/trace/trace_id.h"
+
+namespace grpc {
+
+// TODO: Rename to GrpcTraceContextV0.
+struct GrpcTraceContext {
+ GrpcTraceContext() {}
+
+ explicit GrpcTraceContext(const ::opencensus::trace::SpanContext& ctx) {
+ ctx.trace_id().CopyTo(trace_id);
+ ctx.span_id().CopyTo(span_id);
+ ctx.trace_options().CopyTo(trace_options);
+ }
+
+ ::opencensus::trace::SpanContext ToSpanContext() const {
+ return ::opencensus::trace::SpanContext(
+ ::opencensus::trace::TraceId(trace_id),
+ ::opencensus::trace::SpanId(span_id),
+ ::opencensus::trace::TraceOptions(trace_options));
+ }
+
+ // TODO: For performance:
+ // uint8_t version;
+ // uint8_t trace_id_field_id;
+ uint8_t trace_id[::opencensus::trace::TraceId::kSize];
+ // uint8_t span_id_field_id;
+ uint8_t span_id[::opencensus::trace::SpanId::kSize];
+ // uint8_t trace_options_field_id;
+ uint8_t trace_options[::opencensus::trace::TraceOptions::kSize];
+};
+
+// TraceContextEncoding encapsulates the logic for encoding and decoding of
+// trace contexts.
+class TraceContextEncoding {
+ public:
+ // Size of encoded GrpcTraceContext. (16 + 8 + 1 + 4)
+ static constexpr size_t kGrpcTraceContextSize = 29;
+ // Error value.
+ static constexpr size_t kEncodeDecodeFailure = 0;
+
+ // Deserializes a GrpcTraceContext from the incoming buffer. Returns the
+ // number of bytes deserialized from the buffer. If the incoming buffer is
+ // empty or the encoding version is not supported it will return 0 bytes,
+ // currently only version 0 is supported. If an unknown field ID is
+ // encountered it will return immediately without parsing the rest of the
+ // buffer. Inlined for performance reasons.
+ static size_t Decode(absl::string_view buf, GrpcTraceContext* tc) {
+ if (buf.empty()) {
+ return kEncodeDecodeFailure;
+ }
+ uint8_t version = buf[kVersionIdOffset];
+ // TODO: Support other versions later. Only support version 0 for
+ // now.
+ if (version != kVersionId) {
+ return kEncodeDecodeFailure;
+ }
+
+ size_t pos = kVersionIdSize;
+ while (pos < buf.size()) {
+ size_t bytes_read =
+ ParseField(absl::string_view(&buf[pos], buf.size() - pos), tc);
+ if (bytes_read == 0) {
+ break;
+ } else {
+ pos += bytes_read;
+ }
+ }
+ return pos;
+ }
+
+ // Serializes a GrpcTraceContext into the provided buffer. Returns the number
+ // of bytes serialized into the buffer. If the buffer is not of sufficient
+ // size (it must be at least kGrpcTraceContextSize bytes) it will drop
+ // everything and return 0 bytes serialized. Inlined for performance reasons.
+ static size_t Encode(const GrpcTraceContext& tc, char* buf, size_t buf_size) {
+ if (buf_size < kGrpcTraceContextSize) {
+ return kEncodeDecodeFailure;
+ }
+ buf[kVersionIdOffset] = kVersionId;
+ buf[kTraceIdOffset] = kTraceIdField;
+ memcpy(&buf[kTraceIdOffset + 1], tc.trace_id,
+ opencensus::trace::TraceId::kSize);
+ buf[kSpanIdOffset] = kSpanIdField;
+ memcpy(&buf[kSpanIdOffset + 1], tc.span_id,
+ opencensus::trace::SpanId::kSize);
+ buf[kTraceOptionsOffset] = kTraceOptionsField;
+ memcpy(&buf[kTraceOptionsOffset + 1], tc.trace_options,
+ opencensus::trace::TraceOptions::kSize);
+ return kGrpcTraceContextSize;
+ }
+
+ private:
+ // Parses the next field from the incoming buffer and stores the parsed value
+ // in a GrpcTraceContext struct. If it does not recognize the field ID it
+ // will return 0, otherwise it returns the number of bytes read.
+ static size_t ParseField(absl::string_view buf, GrpcTraceContext* tc) {
+ // TODO: Add support for multi-byte field IDs.
+ if (buf.empty()) {
+ return 0;
+ }
+ // Field ID is always the first byte in a field.
+ uint32_t field_id = buf[0];
+ size_t bytes_read = kFieldIdSize;
+ switch (field_id) {
+ case kTraceIdField:
+ bytes_read += kTraceIdSize;
+ if (bytes_read > buf.size()) {
+ return 0;
+ }
+ memcpy(tc->trace_id, &buf[kFieldIdSize],
+ opencensus::trace::TraceId::kSize);
+ break;
+ case kSpanIdField:
+ bytes_read += kSpanIdSize;
+ if (bytes_read > buf.size()) {
+ return 0;
+ }
+ memcpy(tc->span_id, &buf[kFieldIdSize],
+ opencensus::trace::SpanId::kSize);
+ break;
+ case kTraceOptionsField:
+ bytes_read += kTraceOptionsSize;
+ if (bytes_read > buf.size()) {
+ return 0;
+ }
+ memcpy(tc->trace_options, &buf[kFieldIdSize],
+ opencensus::trace::TraceOptions::kSize);
+ break;
+ default: // Invalid field ID
+ return 0;
+ }
+
+ return bytes_read;
+ }
+
+ // Size of Version ID.
+ static constexpr size_t kVersionIdSize = 1;
+ // Size of Field ID.
+ static constexpr size_t kFieldIdSize = 1;
+
+ // Offset and value for currently supported version ID.
+ static constexpr size_t kVersionIdOffset = 0;
+ static constexpr size_t kVersionId = 0;
+
+ // Fixed Field ID values:
+ enum FieldIdValue {
+ kTraceIdField = 0,
+ kSpanIdField = 1,
+ kTraceOptionsField = 2,
+ };
+
+ // Field data sizes in bytes
+ enum FieldSize {
+ kTraceIdSize = 16,
+ kSpanIdSize = 8,
+ kTraceOptionsSize = 1,
+ };
+
+ // Fixed size offsets for field ID start positions during encoding. Field
+ // data immediately follows.
+ enum FieldIdOffset {
+ kTraceIdOffset = kVersionIdSize,
+ kSpanIdOffset = kTraceIdOffset + kFieldIdSize + kTraceIdSize,
+ kTraceOptionsOffset = kSpanIdOffset + kFieldIdSize + kSpanIdSize,
+ };
+
+ TraceContextEncoding() = delete;
+ TraceContextEncoding(const TraceContextEncoding&) = delete;
+ TraceContextEncoding(TraceContextEncoding&&) = delete;
+ TraceContextEncoding operator=(const TraceContextEncoding&) = delete;
+ TraceContextEncoding operator=(TraceContextEncoding&&) = delete;
+};
+
+// TODO: This may not be needed. Check to see if opencensus requires
+// a trailing server response.
+// RpcServerStatsEncoding encapsulates the logic for encoding and decoding of
+// rpc server stats messages. Rpc server stats consists of a uint64_t time
+// value (server latency in nanoseconds).
+class RpcServerStatsEncoding {
+ public:
+ // Size of encoded RPC server stats.
+ static constexpr size_t kRpcServerStatsSize = 10;
+ // Error value.
+ static constexpr size_t kEncodeDecodeFailure = 0;
+
+ // Deserializes rpc server stats from the incoming 'buf' into *time. Returns
+ // number of bytes decoded. If the buffer is of insufficient size (it must be
+ // at least kRpcServerStatsSize bytes) or the encoding version or field ID are
+ // unrecognized, *time will be set to 0 and it will return
+ // kEncodeDecodeFailure. Inlined for performance reasons.
+ static size_t Decode(absl::string_view buf, uint64_t* time) {
+ if (buf.size() < kRpcServerStatsSize) {
+ *time = 0;
+ return kEncodeDecodeFailure;
+ }
+
+ uint8_t version = buf[kVersionIdOffset];
+ uint32_t fieldID = buf[kServerElapsedTimeOffset];
+ if (version != kVersionId || fieldID != kServerElapsedTimeField) {
+ *time = 0;
+ return kEncodeDecodeFailure;
+ }
+ *time = absl::little_endian::Load64(
+ &buf[kServerElapsedTimeOffset + kFieldIdSize]);
+ return kRpcServerStatsSize;
+ }
+
+ // Serializes rpc server stats into the provided buffer. It returns the
+ // number of bytes written to the buffer. If the buffer is smaller than
+ // kRpcServerStatsSize bytes it will return kEncodeDecodeFailure. Inlined for
+ // performance reasons.
+ static size_t Encode(uint64_t time, char* buf, size_t buf_size) {
+ if (buf_size < kRpcServerStatsSize) {
+ return kEncodeDecodeFailure;
+ }
+
+ buf[kVersionIdOffset] = kVersionId;
+ buf[kServerElapsedTimeOffset] = kServerElapsedTimeField;
+ absl::little_endian::Store64(&buf[kServerElapsedTimeOffset + kFieldIdSize],
+ time);
+ return kRpcServerStatsSize;
+ }
+
+ private:
+ // Size of Version ID.
+ static constexpr size_t kVersionIdSize = 1;
+ // Size of Field ID.
+ static constexpr size_t kFieldIdSize = 1;
+
+ // Offset and value for currently supported version ID.
+ static constexpr size_t kVersionIdOffset = 0;
+ static constexpr size_t kVersionId = 0;
+
+ enum FieldIdValue {
+ kServerElapsedTimeField = 0,
+ };
+
+ enum FieldSize {
+ kServerElapsedTimeSize = 8,
+ };
+
+ enum FieldIdOffset {
+ kServerElapsedTimeOffset = kVersionIdSize,
+ };
+
+ RpcServerStatsEncoding() = delete;
+ RpcServerStatsEncoding(const RpcServerStatsEncoding&) = delete;
+ RpcServerStatsEncoding(RpcServerStatsEncoding&&) = delete;
+ RpcServerStatsEncoding operator=(const RpcServerStatsEncoding&) = delete;
+ RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete;
+};
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H */
diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc
new file mode 100644
index 0000000000..c7c62eefe5
--- /dev/null
+++ b/src/cpp/ext/filters/census/server_filter.cc
@@ -0,0 +1,198 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/server_filter.h"
+
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "opencensus/stats/stats.h"
+#include "src/core/lib/surface/call.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+#include "src/cpp/ext/filters/census/measures.h"
+
+namespace grpc {
+
+constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
+
+namespace {
+
+// server metadata elements
+struct ServerMetadataElements {
+ grpc_slice path;
+ grpc_slice tracing_slice;
+ grpc_slice census_proto;
+};
+
+void FilterInitialMetadata(grpc_metadata_batch* b,
+ ServerMetadataElements* sml) {
+ if (b->idx.named.path != nullptr) {
+ sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
+ }
+ if (b->idx.named.grpc_trace_bin != nullptr) {
+ sml->tracing_slice =
+ grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
+ }
+ if (b->idx.named.grpc_tags_bin != nullptr) {
+ sml->census_proto =
+ grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
+ }
+}
+
+} // namespace
+
+void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusServerCallData* calld =
+ reinterpret_cast<CensusServerCallData*>(elem->call_data);
+ CensusChannelData* channeld =
+ reinterpret_cast<CensusChannelData*>(elem->channel_data);
+ GPR_ASSERT(calld != nullptr);
+ GPR_ASSERT(channeld != nullptr);
+ // Stream messages are no longer valid after receiving trailing metadata.
+ if ((*calld->recv_message_) != nullptr) {
+ ++calld->recv_message_count_;
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
+}
+
+void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusServerCallData* calld =
+ reinterpret_cast<CensusServerCallData*>(elem->call_data);
+ GPR_ASSERT(calld != nullptr);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
+ GPR_ASSERT(initial_metadata != nullptr);
+ ServerMetadataElements sml;
+ sml.path = grpc_empty_slice();
+ sml.tracing_slice = grpc_empty_slice();
+ sml.census_proto = grpc_empty_slice();
+ FilterInitialMetadata(initial_metadata, &sml);
+ calld->path_ = grpc_slice_ref_internal(sml.path);
+ calld->method_ = GetMethod(&calld->path_);
+ calld->qualified_method_ = StrCat("Recv.", calld->method_);
+ const char* tracing_str =
+ GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
+ ? ""
+ : reinterpret_cast<const char*>(
+ GRPC_SLICE_START_PTR(sml.tracing_slice));
+ size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
+ ? 0
+ : GRPC_SLICE_LENGTH(sml.tracing_slice);
+ const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
+ ? ""
+ : reinterpret_cast<const char*>(
+ GRPC_SLICE_START_PTR(sml.census_proto));
+ size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
+ ? 0
+ : GRPC_SLICE_LENGTH(sml.census_proto);
+
+ GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
+ absl::string_view(census_str, census_str_len),
+ /*primary_role*/ "", calld->qualified_method_,
+ &calld->context_);
+
+ grpc_slice_unref_internal(sml.tracing_slice);
+ grpc_slice_unref_internal(sml.census_proto);
+ grpc_slice_unref_internal(sml.path);
+ grpc_census_call_set_context(
+ calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
+ GRPC_ERROR_REF(error));
+}
+
+void CensusServerCallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, TransportStreamOpBatch* op) {
+ if (op->recv_initial_metadata() != nullptr) {
+ // substitute our callback for the op callback
+ recv_initial_metadata_ = op->recv_initial_metadata()->batch();
+ initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
+ op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
+ }
+ if (op->send_message() != nullptr) {
+ ++sent_message_count_;
+ }
+ if (op->recv_message() != nullptr) {
+ recv_message_ = op->op()->payload->recv_message.recv_message;
+ initial_on_done_recv_message_ =
+ op->op()->payload->recv_message.recv_message_ready;
+ op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
+ }
+ // We need to record the time when the trailing metadata was sent to mark the
+ // completeness of the request.
+ if (op->send_trailing_metadata() != nullptr) {
+ elapsed_time_ = absl::Now() - start_time_;
+ size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
+ stats_buf_, kMaxServerStatsLen);
+ if (len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_trailing_metadata()->batch(), &census_bin_,
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
+ grpc_slice_from_copied_buffer(stats_buf_, len))));
+ }
+ }
+ // Call next op.
+ grpc_call_next_op(elem, op->op());
+}
+
+grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ start_time_ = absl::Now();
+ gc_ =
+ grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
+ GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
+ OnDoneRecvInitialMetadataCb, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
+ grpc_schedule_on_exec_ctx);
+ auth_context_ = grpc_call_auth_context(gc_);
+ return GRPC_ERROR_NONE;
+}
+
+void CensusServerCallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) {
+ const uint64_t request_size = GetOutgoingDataSize(final_info);
+ const uint64_t response_size = GetIncomingDataSize(final_info);
+ double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
+ grpc_auth_context_release(auth_context_);
+ ::opencensus::stats::Record(
+ {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
+ {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
+ {RpcServerServerLatency(), elapsed_time_ms},
+ {RpcServerSentMessagesPerRpc(), sent_message_count_},
+ {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
+ {{ServerMethodTagKey(), method_},
+ {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
+ grpc_slice_unref_internal(path_);
+ context_.EndSpan();
+}
+
+} // namespace grpc
diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h
new file mode 100644
index 0000000000..e393ed3283
--- /dev/null
+++ b/src/cpp/ext/filters/census/server_filter.h
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
+#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "include/grpc/grpc_security.h"
+#include "src/cpp/ext/filters/census/channel_filter.h"
+#include "src/cpp/ext/filters/census/context.h"
+
+namespace grpc {
+
+// A CallData class will be created for every grpc call within a channel. It is
+// used to store data and methods specific to that call. CensusServerCallData is
+// thread-compatible, however typically only 1 thread should be interacting with
+// a call at a time.
+class CensusServerCallData : public CallData {
+ public:
+ // Maximum size of server stats that are sent on the wire.
+ static constexpr uint32_t kMaxServerStatsLen = 16;
+
+ CensusServerCallData()
+ : gc_(nullptr),
+ auth_context_(nullptr),
+ recv_initial_metadata_(nullptr),
+ initial_on_done_recv_initial_metadata_(nullptr),
+ initial_on_done_recv_message_(nullptr),
+ recv_message_(nullptr),
+ recv_message_count_(0),
+ sent_message_count_(0) {
+ memset(&census_bin_, 0, sizeof(grpc_linked_mdelem));
+ memset(&path_, 0, sizeof(grpc_slice));
+ memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure));
+ memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
+ }
+
+ grpc_error* Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) override;
+
+ void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) override;
+
+ void StartTransportStreamOpBatch(grpc_call_element* elem,
+ TransportStreamOpBatch* op) override;
+
+ static void OnDoneRecvInitialMetadataCb(void* user_data, grpc_error* error);
+
+ static void OnDoneRecvMessageCb(void* user_data, grpc_error* error);
+
+ private:
+ CensusContext context_;
+ // server method
+ absl::string_view method_;
+ std::string qualified_method_;
+ grpc_slice path_;
+ // Pointer to the grpc_call element
+ grpc_call* gc_;
+ // Authorization context for the call.
+ grpc_auth_context* auth_context_;
+ // Metadata element for census stats.
+ grpc_linked_mdelem census_bin_;
+ // recv callback
+ grpc_metadata_batch* recv_initial_metadata_;
+ grpc_closure* initial_on_done_recv_initial_metadata_;
+ grpc_closure on_done_recv_initial_metadata_;
+ // recv message
+ grpc_closure* initial_on_done_recv_message_;
+ grpc_closure on_done_recv_message_;
+ absl::Time start_time_;
+ absl::Duration elapsed_time_;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_;
+ uint64_t recv_message_count_;
+ uint64_t sent_message_count_;
+ // Buffer needed for grpc_slice to reference it when adding metatdata to
+ // response.
+ char stats_buf_[kMaxServerStatsLen];
+};
+
+} // namespace grpc
+
+#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H */
diff --git a/src/cpp/ext/filters/census/views.cc b/src/cpp/ext/filters/census/views.cc
new file mode 100644
index 0000000000..602b8f4d25
--- /dev/null
+++ b/src/cpp/ext/filters/census/views.cc
@@ -0,0 +1,491 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+
+#include "absl/time/time.h"
+#include "opencensus/stats/internal/aggregation_window.h"
+#include "opencensus/stats/internal/set_aggregation_window.h"
+#include "opencensus/stats/stats.h"
+
+namespace grpc {
+
+using ::opencensus::stats::Aggregation;
+using ::opencensus::stats::AggregationWindow;
+using ::opencensus::stats::BucketBoundaries;
+using ::opencensus::stats::ViewDescriptor;
+
+// These measure definitions should be kept in sync across opencensus
+// implementations.
+
+namespace {
+
+Aggregation BytesDistributionAggregation() {
+ return Aggregation::Distribution(BucketBoundaries::Explicit(
+ {0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216,
+ 67108864, 268435456, 1073741824, 4294967296}));
+}
+
+Aggregation MillisDistributionAggregation() {
+ return Aggregation::Distribution(BucketBoundaries::Explicit(
+ {0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4,
+ 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50,
+ 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650,
+ 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000}));
+}
+
+Aggregation CountDistributionAggregation() {
+ return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0));
+}
+
+ViewDescriptor MinuteDescriptor() {
+ auto descriptor = ViewDescriptor();
+ SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)),
+ &descriptor);
+ return descriptor;
+}
+
+ViewDescriptor HourDescriptor() {
+ auto descriptor = ViewDescriptor();
+ SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)),
+ &descriptor);
+ return descriptor;
+}
+
+} // namespace
+
+void RegisterGrpcViewsForExport() {
+ ClientSentMessagesPerRpcCumulative().RegisterForExport();
+ ClientSentBytesPerRpcCumulative().RegisterForExport();
+ ClientReceivedMessagesPerRpcCumulative().RegisterForExport();
+ ClientReceivedBytesPerRpcCumulative().RegisterForExport();
+ ClientRoundtripLatencyCumulative().RegisterForExport();
+ ClientServerLatencyCumulative().RegisterForExport();
+
+ ServerSentMessagesPerRpcCumulative().RegisterForExport();
+ ServerSentBytesPerRpcCumulative().RegisterForExport();
+ ServerReceivedMessagesPerRpcCumulative().RegisterForExport();
+ ServerReceivedBytesPerRpcCumulative().RegisterForExport();
+ ServerServerLatencyCumulative().RegisterForExport();
+}
+
+// client cumulative
+const ViewDescriptor& ClientSentBytesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/sent_bytes_per_rpc/cumulative")
+ .set_measure(kRpcClientSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/received_bytes_per_rpc/cumulative")
+ .set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientRoundtripLatencyCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/roundtrip_latency/cumulative")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientServerLatencyCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/server_latency/cumulative")
+ .set_measure(kRpcClientServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientCompletedRpcsCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/completed_rpcs/cumulative")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ClientMethodTagKey())
+ .add_column(ClientStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientSentMessagesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/received_messages_per_rpc/cumulative")
+ .set_measure(kRpcClientSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/client/sent_messages_per_rpc/cumulative")
+ .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+// server cumulative
+const ViewDescriptor& ServerSentBytesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/received_bytes_per_rpc/cumulative")
+ .set_measure(kRpcServerSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/sent_bytes_per_rpc/cumulative")
+ .set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerServerLatencyCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/elapsed_time/cumulative")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerCompletedRpcsCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/completed_rpcs/cumulative")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ServerMethodTagKey())
+ .add_column(ServerStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerSentMessagesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/received_messages_per_rpc/cumulative")
+ .set_measure(kRpcServerSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() {
+ const static ViewDescriptor descriptor =
+ ViewDescriptor()
+ .set_name("grpc.io/server/sent_messages_per_rpc/cumulative")
+ .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+// client minute
+const ViewDescriptor& ClientSentBytesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/sent_bytes_per_rpc/minute")
+ .set_measure(kRpcClientSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedBytesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/received_bytes_per_rpc/minute")
+ .set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientRoundtripLatencyMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/roundtrip_latency/minute")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientServerLatencyMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/server_latency/minute")
+ .set_measure(kRpcClientServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientCompletedRpcsMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/completed_rpcs/minute")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ClientMethodTagKey())
+ .add_column(ClientStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientSentMessagesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/sent_messages_per_rpc/minute")
+ .set_measure(kRpcClientSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedMessagesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/client/received_messages_per_rpc/minute")
+ .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+// server minute
+const ViewDescriptor& ServerSentBytesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/sent_bytes_per_rpc/minute")
+ .set_measure(kRpcServerSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedBytesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/received_bytes_per_rpc/minute")
+ .set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerServerLatencyMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/server_latency/minute")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerCompletedRpcsMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/completed_rpcs/minute")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ServerMethodTagKey())
+ .add_column(ServerStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerSentMessagesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/sent_messages_per_rpc/minute")
+ .set_measure(kRpcServerSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedMessagesPerRpcMinute() {
+ const static ViewDescriptor descriptor =
+ MinuteDescriptor()
+ .set_name("grpc.io/server/received_messages_per_rpc/minute")
+ .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+// client hour
+const ViewDescriptor& ClientSentBytesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/sent_bytes_per_rpc/hour")
+ .set_measure(kRpcClientSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedBytesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/received_bytes_per_rpc/hour")
+ .set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientRoundtripLatencyHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/roundtrip_latency/hour")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientServerLatencyHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/server_latency/hour")
+ .set_measure(kRpcClientServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientCompletedRpcsHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/completed_rpcs/hour")
+ .set_measure(kRpcClientRoundtripLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ClientMethodTagKey())
+ .add_column(ClientStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientSentMessagesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/sent_messages_per_rpc/hour")
+ .set_measure(kRpcClientSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ClientReceivedMessagesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/client/received_messages_per_rpc/hour")
+ .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ClientMethodTagKey());
+ return descriptor;
+}
+
+// server hour
+const ViewDescriptor& ServerSentBytesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/sent_bytes_per_rpc/hour")
+ .set_measure(kRpcServerSentBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedBytesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/received_bytes_per_rpc/hour")
+ .set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
+ .set_aggregation(BytesDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerServerLatencyHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/server_latency/hour")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(MillisDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerCompletedRpcsHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/completed_rpcs/hour")
+ .set_measure(kRpcServerServerLatencyMeasureName)
+ .set_aggregation(Aggregation::Count())
+ .add_column(ServerMethodTagKey())
+ .add_column(ServerStatusTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerSentMessagesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/sent_messages_per_rpc/hour")
+ .set_measure(kRpcServerSentMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+const ViewDescriptor& ServerReceivedMessagesPerRpcHour() {
+ const static ViewDescriptor descriptor =
+ HourDescriptor()
+ .set_name("grpc.io/server/received_messages_per_rpc/hour")
+ .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
+ .set_aggregation(CountDistributionAggregation())
+ .add_column(ServerMethodTagKey());
+ return descriptor;
+}
+
+} // namespace grpc