diff options
author | Vizerai <jsking@google.com> | 2018-04-13 18:19:21 -0700 |
---|---|---|
committer | Vizerai <jsking@google.com> | 2018-05-10 11:41:15 -0700 |
commit | 41e4cedb7012a55376322b142d74eae5e86b95e3 (patch) | |
tree | e88e094d1381ab459e25f12328d20a2945b09660 /src/cpp/ext | |
parent | 082ddc563ea71a6b7a700070ec60095bfb65d88f (diff) |
Adding opencensus grpc plugin.
Rebasing to merge commits.
Diffstat (limited to 'src/cpp/ext')
-rw-r--r-- | src/cpp/ext/filters/census/channel_filter.cc | 30 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/channel_filter.h | 36 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/client_filter.cc | 163 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/client_filter.h | 104 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/context.cc | 132 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/context.h | 126 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_context.cc | 38 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_plugin.cc | 130 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_plugin.h | 125 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/measures.cc | 129 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/measures.h | 46 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/rpc_encoding.cc | 39 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/rpc_encoding.h | 284 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/server_filter.cc | 198 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/server_filter.h | 101 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/views.cc | 491 |
16 files changed, 2172 insertions, 0 deletions
diff --git a/src/cpp/ext/filters/census/channel_filter.cc b/src/cpp/ext/filters/census/channel_filter.cc new file mode 100644 index 0000000000..4ac684d277 --- /dev/null +++ b/src/cpp/ext/filters/census/channel_filter.cc @@ -0,0 +1,30 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/channel_filter.h" + +namespace grpc { + +grpc_error* CensusChannelData::Init(grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/channel_filter.h b/src/cpp/ext/filters/census/channel_filter.h new file mode 100644 index 0000000000..0b7c682681 --- /dev/null +++ b/src/cpp/ext/filters/census/channel_filter.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +class CensusChannelData : public ChannelData { + public: + grpc_error* Init(grpc_channel_element* elem, + grpc_channel_element_args* args) override; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H */ diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc new file mode 100644 index 0000000000..293f4b1c25 --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -0,0 +1,163 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/client_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusClientCallData::kMaxTraceContextLen; +constexpr uint32_t CensusClientCallData::kMaxTagsLen; + +namespace { + +void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { + if (b->idx.named.grpc_server_stats_bin != nullptr) { + ServerStatsDeserialize( + reinterpret_cast<const char*>(GRPC_SLICE_START_PTR( + GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)), + elapsed_time); + grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin); + } +} + +} // namespace + +void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusClientCallData* calld = + reinterpret_cast<CensusClientCallData*>(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr); + FilterTrailingMetadata(calld->recv_trailing_metadata_, + &calld->elapsed_time_); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusClientCallData* calld = + reinterpret_cast<CensusClientCallData*>(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast<CensusChannelData*>(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + calld->recv_message_count_++; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->send_initial_metadata() != nullptr) { + census_context* ctxt = op->get_census_context(); + GenerateClientContext( + qualified_method_, &context_, + (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt)); + size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_, + kMaxTraceContextLen); + if (tracing_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &tracing_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_TRACE_BIN, + grpc_slice_from_copied_buffer(tracing_buf_, tracing_len)))); + } + grpc_slice tags = grpc_empty_slice(); + // TODO: Add in tagging serialization. + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &stats_bin_, + grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags))); + } + } + + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + if (op->recv_trailing_metadata() != nullptr) { + recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); + initial_on_done_recv_trailing_metadata_ = op->on_complete(); + op->set_on_complete(&on_done_recv_trailing_metadata_); + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusClientCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + path_ = grpc_slice_ref_internal(args->path); + start_time_ = absl::Now(); + method_ = GetMethod(&path_); + qualified_method_ = absl::StrCat("Sent.", method_); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_, + OnDoneRecvTrailingMetadataCb, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +void CensusClientCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_); + ::opencensus::stats::Record( + {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)}, + {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)}, + {RpcClientRoundtripLatency(), latency_ms}, + {RpcClientServerLatency(), + ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}, + {RpcClientSentMessagesPerRpc(), sent_message_count_}, + {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, + {{ClientMethodTagKey(), method_}, + {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h new file mode 100644 index 0000000000..851022873f --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusClientCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusClientCallData : public CallData { + public: + // Maximum size of trace context is sent on the wire. + static constexpr uint32_t kMaxTraceContextLen = 64; + // Maximum size of tags that are sent on the wire. + static constexpr uint32_t kMaxTagsLen = 2048; + + CensusClientCallData() + : recv_trailing_metadata_(nullptr), + initial_on_done_recv_trailing_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + elapsed_time_(0), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvTrailingMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneSendInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // Metadata elements for tracing and census stats data. + grpc_linked_mdelem stats_bin_; + grpc_linked_mdelem tracing_bin_; + // Client method. + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // The recv trailing metadata callbacks. + grpc_metadata_batch* recv_trailing_metadata_; + grpc_closure* initial_on_done_recv_trailing_metadata_; + grpc_closure on_done_recv_trailing_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + // Start time (for measuring latency). + absl::Time start_time_; + // Server elapsed time in nanoseconds. + uint64_t elapsed_time_; + // The received message--may be null. + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_; + // Number of messages in this RPC. + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference when adding trace context + // metatdata to outgoing message. + char tracing_buf_[kMaxTraceContextLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H */ diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc new file mode 100644 index 0000000000..4b3250236d --- /dev/null +++ b/src/cpp/ext/filters/census/context.cc @@ -0,0 +1,132 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +using ::opencensus::trace::Span; +using ::opencensus::trace::SpanContext; + +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context) { + GrpcTraceContext trace_ctxt; + TraceContextEncoding::Decode(tracing, &trace_ctxt); + SpanContext parent_ctx = trace_ctxt.ToSpanContext(); + new (context) CensusContext(method, parent_ctx); +} + +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctxt) { + if (parent_ctxt != nullptr) { + SpanContext span_ctxt = parent_ctxt->Context(); + Span span = parent_ctxt->Span(); + if (span_ctxt.IsValid()) { + new (ctxt) CensusContext(method, &span); + return; + } + } + new (ctxt) CensusContext(method); +} + +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size) { + GrpcTraceContext trace_ctxt(context); + return TraceContextEncoding::Encode(trace_ctxt, tracing_buf, + tracing_buf_size); +} + +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags) { + // TODO: Add implementation. Waiting on stats tagging to be added. + return 0; +} + +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size) { + return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size); +} + +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time) { + return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size), + server_elapsed_time); +} + +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.incoming.data_bytes; +} + +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.outgoing.data_bytes; +} + +SpanContext SpanContextFromCensusContext(const census_context* ctxt) { + return reinterpret_cast<const CensusContext*>(ctxt)->Context(); +} + +Span SpanFromCensusContext(const census_context* ctxt) { + return reinterpret_cast<const CensusContext*>(ctxt)->Span(); +} + +absl::string_view StatusCodeToString(grpc_status_code code) { + switch (code) { + case GRPC_STATUS_OK: + return "OK"; + case GRPC_STATUS_CANCELLED: + return "CANCELLED"; + case GRPC_STATUS_UNKNOWN: + return "UNKNOWN"; + case GRPC_STATUS_INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case GRPC_STATUS_NOT_FOUND: + return "NOT_FOUND"; + case GRPC_STATUS_ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case GRPC_STATUS_PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case GRPC_STATUS_UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case GRPC_STATUS_FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case GRPC_STATUS_ABORTED: + return "ABORTED"; + case GRPC_STATUS_OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case GRPC_STATUS_UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case GRPC_STATUS_INTERNAL: + return "INTERNAL"; + case GRPC_STATUS_UNAVAILABLE: + return "UNAVAILABLE"; + case GRPC_STATUS_DATA_LOSS: + return "DATA_LOSS"; + default: + // gRPC wants users of this enum to include a default branch so that + // adding values is not a breaking change. + return "UNKNOWN_STATUS"; + } +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/context.h b/src/cpp/ext/filters/census/context.h new file mode 100644 index 0000000000..1643fdd11b --- /dev/null +++ b/src/cpp/ext/filters/census/context.h @@ -0,0 +1,126 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H + +#include <grpc/support/port_platform.h> + +#include <grpc/status.h> +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "opencensus/trace/span.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/trace_params.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/cpp/common/channel_filter.h" +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +// This is needed because grpc has hardcoded CensusContext with a +// forward declaration of 'struct census_context;' +struct census_context; + +namespace grpc { + +// Thread compatible. +class CensusContext { + public: + CensusContext() : span_(::opencensus::trace::Span::BlankSpan()) {} + + explicit CensusContext(absl::string_view name) + : span_(::opencensus::trace::Span::StartSpan(name)) {} + + CensusContext(absl::string_view name, const ::opencensus::trace::Span* parent) + : span_(::opencensus::trace::Span::StartSpan(name, parent)) {} + + CensusContext(absl::string_view name, + const ::opencensus::trace::SpanContext& parent_ctxt) + : span_(::opencensus::trace::Span::StartSpanWithRemoteParent( + name, parent_ctxt)) {} + + ::opencensus::trace::SpanContext Context() const { return span_.context(); } + ::opencensus::trace::Span Span() const { return span_; } + void EndSpan() { span_.End(); } + + private: + ::opencensus::trace::Span span_; +}; + +// Serializes the outgoing trace context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size); + +// Serializes the outgoing stats context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. Tags are directly +// serialized into the given grpc_slice. +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags); + +// Serialize outgoing server stats. Returns the number of bytes serialized. +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size); + +// Deserialize incoming server stats. Returns the number of bytes deserialized. +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time); + +// Deserialize the incoming SpanContext and generate a new server context based +// on that. This new span will never be a root span. This should only be called +// with a blank CensusContext as it overwrites it. +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context); + +// Creates a new client context that is by default a new root context. +// If the current context is the default context then the newly created +// span automatically becomes a root span. This should only be called with a +// blank CensusContext as it overwrites it. +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctx); + +// Returns the incoming data size from the grpc call final info. +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info); + +// Returns the outgoing data size from the grpc call final info. +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info); + +// These helper functions return the SpanContext and Span, respectively +// associated with the census_context* stored by grpc. The user will need to +// call this for manual propagation of tracing data. +::opencensus::trace::SpanContext SpanContextFromCensusContext( + const census_context* ctxt); +::opencensus::trace::Span SpanFromCensusContext(const census_context* ctxt); + +// Returns a string representation of the StatusCode enum. +absl::string_view StatusCodeToString(grpc_status_code code); + +inline absl::string_view GetMethod(const grpc_slice* path) { + if (GRPC_SLICE_IS_EMPTY(*path)) { + return ""; + } + // Check for leading '/' and trim it if present. + return absl::StripPrefix(absl::string_view(reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(*path)), + GRPC_SLICE_LENGTH(*path)), + "/"); +} + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H */ diff --git a/src/cpp/ext/filters/census/grpc_context.cc b/src/cpp/ext/filters/census/grpc_context.cc new file mode 100644 index 0000000000..599a798dda --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_context.cc @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include <grpc/census.h> +#include <grpc/grpc.h> +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/call.h" + +void grpc_census_call_set_context(grpc_call* call, census_context* context) { + GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2, + (call, context)); + if (context != nullptr) { + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, nullptr); + } +} + +census_context* grpc_census_call_get_context(grpc_call* call) { + GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call)); + return static_cast<census_context*>( + grpc_call_context_get(call, GRPC_CONTEXT_TRACING)); +} diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc new file mode 100644 index 0000000000..f978ed3bf5 --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -0,0 +1,130 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include <grpcpp/server_context.h> + +#include "opencensus/trace/span.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/client_filter.h" +#include "src/cpp/ext/filters/census/measures.h" +#include "src/cpp/ext/filters/census/server_filter.h" + +namespace grpc { + +void RegisterOpenCensusPlugin() { + RegisterChannelFilter<CensusChannelData, CensusClientCallData>( + "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + RegisterChannelFilter<CensusChannelData, CensusServerCallData>( + "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + + // Access measures to ensure they are initialized. Otherwise, creating a view + // before the first RPC would cause an error. + RpcClientSentBytesPerRpc(); + RpcClientReceivedBytesPerRpc(); + RpcClientRoundtripLatency(); + RpcClientServerLatency(); + RpcClientSentMessagesPerRpc(); + RpcClientReceivedMessagesPerRpc(); + + RpcServerSentBytesPerRpc(); + RpcServerReceivedBytesPerRpc(); + RpcServerServerLatency(); + RpcServerSentMessagesPerRpc(); + RpcServerReceivedMessagesPerRpc(); +} + +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context) { + return reinterpret_cast<const CensusContext*>(context->census_context()) + ->Span(); +} + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. +::opencensus::stats::TagKey ClientMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ClientStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_status"); + return status_tag_key; +} + +::opencensus::stats::TagKey ServerMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ServerStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_status"); + return status_tag_key; +} + +// Client +ABSL_CONST_INIT const absl::string_view + kRpcClientSentMessagesPerRpcMeasureName = + "grpc.io/client/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientSentBytesPerRpcMeasureName = + "grpc.io/client/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedMessagesPerRpcMeasureName = + "grpc.io/client/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedBytesPerRpcMeasureName = + "grpc.io/client/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName = + "grpc.io/client/roundtrip_latency"; + +ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName = + "grpc.io/client/server_latency"; + +// Server +ABSL_CONST_INIT const absl::string_view + kRpcServerSentMessagesPerRpcMeasureName = + "grpc.io/server/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerSentBytesPerRpcMeasureName = + "grpc.io/server/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedMessagesPerRpcMeasureName = + "grpc.io/server/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedBytesPerRpcMeasureName = + "grpc.io/server/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName = + "grpc.io/server/server_latency"; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h new file mode 100644 index 0000000000..107db18de9 --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.h @@ -0,0 +1,125 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "opencensus/stats/stats.h" +#include "opencensus/trace/span.h" + +namespace grpc { +class ServerContext; +} + +namespace grpc { + +// Registers the OpenCensus plugin with gRPC, so that it will be used for future +// RPCs. This must be called before any views are created on the measures +// defined below. +void RegisterOpenCensusPlugin(); + +// RPC stats definitions, defined by +// https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md + +// Registers the cumulative gRPC views so that they will be exported by any +// registered stats exporter. +// For on-task stats, construct a View using the ViewDescriptors below. +void RegisterGrpcViewsForExport(); + +// Returns the tracing Span for the current RPC. +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context); + +// The tag keys set when recording RPC stats. +::opencensus::stats::TagKey ClientMethodTagKey(); +::opencensus::stats::TagKey ClientStatusTagKey(); +::opencensus::stats::TagKey ServerMethodTagKey(); +::opencensus::stats::TagKey ServerStatusTagKey(); + +// Names of measures used by the plugin--users can create views on these +// measures but should not record data for them. +extern const absl::string_view kRpcClientSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientRoundtripLatencyMeasureName; +extern const absl::string_view kRpcClientServerLatencyMeasureName; + +extern const absl::string_view kRpcServerSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerServerLatencyMeasureName; + +// Canonical gRPC view definitions. +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative(); + +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedMessagesPerRpcCumulative(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H */ diff --git a/src/cpp/ext/filters/census/measures.cc b/src/cpp/ext/filters/census/measures.cc new file mode 100644 index 0000000000..b522fae09a --- /dev/null +++ b/src/cpp/ext/filters/census/measures.cc @@ -0,0 +1,129 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/measures.h" + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +using ::opencensus::stats::MeasureDouble; +using ::opencensus::stats::MeasureInt64; + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. + +namespace { + +// Unit constants +constexpr char kUnitBytes[] = "By"; +constexpr char kUnitMilliseconds[] = "ms"; +constexpr char kCount[] = "1"; + +} // namespace + +// Client +MeasureDouble RpcClientSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientSentBytesPerRpcMeasureName, + "Total bytes sent across all request messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientReceivedBytesPerRpcMeasureName, + "Total bytes received across all response messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientRoundtripLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientRoundtripLatencyMeasureName, + "Time between first byte of request sent to last byte of response " + "received, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureDouble RpcClientServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error (propagated from the server)", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcClientSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcClientReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +// Server +MeasureDouble RpcServerSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerSentBytesPerRpcMeasureName, + "Total bytes sent across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerReceivedBytesPerRpcMeasureName, + "Total bytes received across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcServerServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcServerSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcServerReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/measures.h b/src/cpp/ext/filters/census/measures.h new file mode 100644 index 0000000000..8f8e72ace2 --- /dev/null +++ b/src/cpp/ext/filters/census/measures.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H + +#include <grpc/support/port_platform.h> + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +::opencensus::stats::MeasureInt64 RpcClientSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientRoundtripLatency(); +::opencensus::stats::MeasureDouble RpcClientServerLatency(); +::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs(); + +::opencensus::stats::MeasureInt64 RpcServerSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerServerLatency(); +::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H */ diff --git a/src/cpp/ext/filters/census/rpc_encoding.cc b/src/cpp/ext/filters/census/rpc_encoding.cc new file mode 100644 index 0000000000..45a66d9dc8 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +namespace grpc { + +constexpr size_t TraceContextEncoding::kGrpcTraceContextSize; +constexpr size_t TraceContextEncoding::kEncodeDecodeFailure; +constexpr size_t TraceContextEncoding::kVersionIdSize; +constexpr size_t TraceContextEncoding::kFieldIdSize; +constexpr size_t TraceContextEncoding::kVersionIdOffset; +constexpr size_t TraceContextEncoding::kVersionId; + +constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize; +constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure; +constexpr size_t RpcServerStatsEncoding::kVersionIdSize; +constexpr size_t RpcServerStatsEncoding::kFieldIdSize; +constexpr size_t RpcServerStatsEncoding::kVersionIdOffset; +constexpr size_t RpcServerStatsEncoding::kVersionId; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/rpc_encoding.h b/src/cpp/ext/filters/census/rpc_encoding.h new file mode 100644 index 0000000000..ffffa60c46 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.h @@ -0,0 +1,284 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H + +#include <grpc/support/port_platform.h> + +#include <string.h> + +#include "absl/base/internal/endian.h" +#include "absl/strings/string_view.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/span_id.h" +#include "opencensus/trace/trace_id.h" + +namespace grpc { + +// TODO: Rename to GrpcTraceContextV0. +struct GrpcTraceContext { + GrpcTraceContext() {} + + explicit GrpcTraceContext(const ::opencensus::trace::SpanContext& ctx) { + ctx.trace_id().CopyTo(trace_id); + ctx.span_id().CopyTo(span_id); + ctx.trace_options().CopyTo(trace_options); + } + + ::opencensus::trace::SpanContext ToSpanContext() const { + return ::opencensus::trace::SpanContext( + ::opencensus::trace::TraceId(trace_id), + ::opencensus::trace::SpanId(span_id), + ::opencensus::trace::TraceOptions(trace_options)); + } + + // TODO: For performance: + // uint8_t version; + // uint8_t trace_id_field_id; + uint8_t trace_id[::opencensus::trace::TraceId::kSize]; + // uint8_t span_id_field_id; + uint8_t span_id[::opencensus::trace::SpanId::kSize]; + // uint8_t trace_options_field_id; + uint8_t trace_options[::opencensus::trace::TraceOptions::kSize]; +}; + +// TraceContextEncoding encapsulates the logic for encoding and decoding of +// trace contexts. +class TraceContextEncoding { + public: + // Size of encoded GrpcTraceContext. (16 + 8 + 1 + 4) + static constexpr size_t kGrpcTraceContextSize = 29; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes a GrpcTraceContext from the incoming buffer. Returns the + // number of bytes deserialized from the buffer. If the incoming buffer is + // empty or the encoding version is not supported it will return 0 bytes, + // currently only version 0 is supported. If an unknown field ID is + // encountered it will return immediately without parsing the rest of the + // buffer. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, GrpcTraceContext* tc) { + if (buf.empty()) { + return kEncodeDecodeFailure; + } + uint8_t version = buf[kVersionIdOffset]; + // TODO: Support other versions later. Only support version 0 for + // now. + if (version != kVersionId) { + return kEncodeDecodeFailure; + } + + size_t pos = kVersionIdSize; + while (pos < buf.size()) { + size_t bytes_read = + ParseField(absl::string_view(&buf[pos], buf.size() - pos), tc); + if (bytes_read == 0) { + break; + } else { + pos += bytes_read; + } + } + return pos; + } + + // Serializes a GrpcTraceContext into the provided buffer. Returns the number + // of bytes serialized into the buffer. If the buffer is not of sufficient + // size (it must be at least kGrpcTraceContextSize bytes) it will drop + // everything and return 0 bytes serialized. Inlined for performance reasons. + static size_t Encode(const GrpcTraceContext& tc, char* buf, size_t buf_size) { + if (buf_size < kGrpcTraceContextSize) { + return kEncodeDecodeFailure; + } + buf[kVersionIdOffset] = kVersionId; + buf[kTraceIdOffset] = kTraceIdField; + memcpy(&buf[kTraceIdOffset + 1], tc.trace_id, + opencensus::trace::TraceId::kSize); + buf[kSpanIdOffset] = kSpanIdField; + memcpy(&buf[kSpanIdOffset + 1], tc.span_id, + opencensus::trace::SpanId::kSize); + buf[kTraceOptionsOffset] = kTraceOptionsField; + memcpy(&buf[kTraceOptionsOffset + 1], tc.trace_options, + opencensus::trace::TraceOptions::kSize); + return kGrpcTraceContextSize; + } + + private: + // Parses the next field from the incoming buffer and stores the parsed value + // in a GrpcTraceContext struct. If it does not recognize the field ID it + // will return 0, otherwise it returns the number of bytes read. + static size_t ParseField(absl::string_view buf, GrpcTraceContext* tc) { + // TODO: Add support for multi-byte field IDs. + if (buf.empty()) { + return 0; + } + // Field ID is always the first byte in a field. + uint32_t field_id = buf[0]; + size_t bytes_read = kFieldIdSize; + switch (field_id) { + case kTraceIdField: + bytes_read += kTraceIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_id, &buf[kFieldIdSize], + opencensus::trace::TraceId::kSize); + break; + case kSpanIdField: + bytes_read += kSpanIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->span_id, &buf[kFieldIdSize], + opencensus::trace::SpanId::kSize); + break; + case kTraceOptionsField: + bytes_read += kTraceOptionsSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_options, &buf[kFieldIdSize], + opencensus::trace::TraceOptions::kSize); + break; + default: // Invalid field ID + return 0; + } + + return bytes_read; + } + + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + // Fixed Field ID values: + enum FieldIdValue { + kTraceIdField = 0, + kSpanIdField = 1, + kTraceOptionsField = 2, + }; + + // Field data sizes in bytes + enum FieldSize { + kTraceIdSize = 16, + kSpanIdSize = 8, + kTraceOptionsSize = 1, + }; + + // Fixed size offsets for field ID start positions during encoding. Field + // data immediately follows. + enum FieldIdOffset { + kTraceIdOffset = kVersionIdSize, + kSpanIdOffset = kTraceIdOffset + kFieldIdSize + kTraceIdSize, + kTraceOptionsOffset = kSpanIdOffset + kFieldIdSize + kSpanIdSize, + }; + + TraceContextEncoding() = delete; + TraceContextEncoding(const TraceContextEncoding&) = delete; + TraceContextEncoding(TraceContextEncoding&&) = delete; + TraceContextEncoding operator=(const TraceContextEncoding&) = delete; + TraceContextEncoding operator=(TraceContextEncoding&&) = delete; +}; + +// TODO: This may not be needed. Check to see if opencensus requires +// a trailing server response. +// RpcServerStatsEncoding encapsulates the logic for encoding and decoding of +// rpc server stats messages. Rpc server stats consists of a uint64_t time +// value (server latency in nanoseconds). +class RpcServerStatsEncoding { + public: + // Size of encoded RPC server stats. + static constexpr size_t kRpcServerStatsSize = 10; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes rpc server stats from the incoming 'buf' into *time. Returns + // number of bytes decoded. If the buffer is of insufficient size (it must be + // at least kRpcServerStatsSize bytes) or the encoding version or field ID are + // unrecognized, *time will be set to 0 and it will return + // kEncodeDecodeFailure. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, uint64_t* time) { + if (buf.size() < kRpcServerStatsSize) { + *time = 0; + return kEncodeDecodeFailure; + } + + uint8_t version = buf[kVersionIdOffset]; + uint32_t fieldID = buf[kServerElapsedTimeOffset]; + if (version != kVersionId || fieldID != kServerElapsedTimeField) { + *time = 0; + return kEncodeDecodeFailure; + } + *time = absl::little_endian::Load64( + &buf[kServerElapsedTimeOffset + kFieldIdSize]); + return kRpcServerStatsSize; + } + + // Serializes rpc server stats into the provided buffer. It returns the + // number of bytes written to the buffer. If the buffer is smaller than + // kRpcServerStatsSize bytes it will return kEncodeDecodeFailure. Inlined for + // performance reasons. + static size_t Encode(uint64_t time, char* buf, size_t buf_size) { + if (buf_size < kRpcServerStatsSize) { + return kEncodeDecodeFailure; + } + + buf[kVersionIdOffset] = kVersionId; + buf[kServerElapsedTimeOffset] = kServerElapsedTimeField; + absl::little_endian::Store64(&buf[kServerElapsedTimeOffset + kFieldIdSize], + time); + return kRpcServerStatsSize; + } + + private: + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + enum FieldIdValue { + kServerElapsedTimeField = 0, + }; + + enum FieldSize { + kServerElapsedTimeSize = 8, + }; + + enum FieldIdOffset { + kServerElapsedTimeOffset = kVersionIdSize, + }; + + RpcServerStatsEncoding() = delete; + RpcServerStatsEncoding(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding(RpcServerStatsEncoding&&) = delete; + RpcServerStatsEncoding operator=(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H */ diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc new file mode 100644 index 0000000000..c7c62eefe5 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.cc @@ -0,0 +1,198 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/server_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusServerCallData::kMaxServerStatsLen; + +namespace { + +// server metadata elements +struct ServerMetadataElements { + grpc_slice path; + grpc_slice tracing_slice; + grpc_slice census_proto; +}; + +void FilterInitialMetadata(grpc_metadata_batch* b, + ServerMetadataElements* sml) { + if (b->idx.named.path != nullptr) { + sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md)); + } + if (b->idx.named.grpc_trace_bin != nullptr) { + sml->tracing_slice = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin); + } + if (b->idx.named.grpc_tags_bin != nullptr) { + sml->census_proto = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin); + } +} + +} // namespace + +void CensusServerCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusServerCallData* calld = + reinterpret_cast<CensusServerCallData*>(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast<CensusChannelData*>(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + ++calld->recv_message_count_; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + CensusServerCallData* calld = + reinterpret_cast<CensusServerCallData*>(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_; + GPR_ASSERT(initial_metadata != nullptr); + ServerMetadataElements sml; + sml.path = grpc_empty_slice(); + sml.tracing_slice = grpc_empty_slice(); + sml.census_proto = grpc_empty_slice(); + FilterInitialMetadata(initial_metadata, &sml); + calld->path_ = grpc_slice_ref_internal(sml.path); + calld->method_ = GetMethod(&calld->path_); + calld->qualified_method_ = StrCat("Recv.", calld->method_); + const char* tracing_str = + GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? "" + : reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(sml.tracing_slice)); + size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? 0 + : GRPC_SLICE_LENGTH(sml.tracing_slice); + const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? "" + : reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(sml.census_proto)); + size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? 0 + : GRPC_SLICE_LENGTH(sml.census_proto); + + GenerateServerContext(absl::string_view(tracing_str, tracing_str_len), + absl::string_view(census_str, census_str_len), + /*primary_role*/ "", calld->qualified_method_, + &calld->context_); + + grpc_slice_unref_internal(sml.tracing_slice); + grpc_slice_unref_internal(sml.census_proto); + grpc_slice_unref_internal(sml.path); + grpc_census_call_set_context( + calld->gc_, reinterpret_cast<census_context*>(&calld->context_)); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->recv_initial_metadata() != nullptr) { + // substitute our callback for the op callback + recv_initial_metadata_ = op->recv_initial_metadata()->batch(); + initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready(); + op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_); + } + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + // We need to record the time when the trailing metadata was sent to mark the + // completeness of the request. + if (op->send_trailing_metadata() != nullptr) { + elapsed_time_ = absl::Now() - start_time_; + size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), + stats_buf_, kMaxServerStatsLen); + if (len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_trailing_metadata()->batch(), &census_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_SERVER_STATS_BIN, + grpc_slice_from_copied_buffer(stats_buf_, len)))); + } + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusServerCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + start_time_ = absl::Now(); + gc_ = + grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0)); + GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_, + OnDoneRecvInitialMetadataCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + auth_context_ = grpc_call_auth_context(gc_); + return GRPC_ERROR_NONE; +} + +void CensusServerCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); + grpc_auth_context_release(auth_context_); + ::opencensus::stats::Record( + {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)}, + {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)}, + {RpcServerServerLatency(), elapsed_time_ms}, + {RpcServerSentMessagesPerRpc(), sent_message_count_}, + {RpcServerReceivedMessagesPerRpc(), recv_message_count_}}, + {{ServerMethodTagKey(), method_}, + {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h new file mode 100644 index 0000000000..e393ed3283 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.h @@ -0,0 +1,101 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "include/grpc/grpc_security.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusServerCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusServerCallData : public CallData { + public: + // Maximum size of server stats that are sent on the wire. + static constexpr uint32_t kMaxServerStatsLen = 16; + + CensusServerCallData() + : gc_(nullptr), + auth_context_(nullptr), + recv_initial_metadata_(nullptr), + initial_on_done_recv_initial_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&census_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // server method + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // Pointer to the grpc_call element + grpc_call* gc_; + // Authorization context for the call. + grpc_auth_context* auth_context_; + // Metadata element for census stats. + grpc_linked_mdelem census_bin_; + // recv callback + grpc_metadata_batch* recv_initial_metadata_; + grpc_closure* initial_on_done_recv_initial_metadata_; + grpc_closure on_done_recv_initial_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + absl::Time start_time_; + absl::Duration elapsed_time_; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_; + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference it when adding metatdata to + // response. + char stats_buf_[kMaxServerStatsLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H */ diff --git a/src/cpp/ext/filters/census/views.cc b/src/cpp/ext/filters/census/views.cc new file mode 100644 index 0000000000..602b8f4d25 --- /dev/null +++ b/src/cpp/ext/filters/census/views.cc @@ -0,0 +1,491 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include "absl/time/time.h" +#include "opencensus/stats/internal/aggregation_window.h" +#include "opencensus/stats/internal/set_aggregation_window.h" +#include "opencensus/stats/stats.h" + +namespace grpc { + +using ::opencensus::stats::Aggregation; +using ::opencensus::stats::AggregationWindow; +using ::opencensus::stats::BucketBoundaries; +using ::opencensus::stats::ViewDescriptor; + +// These measure definitions should be kept in sync across opencensus +// implementations. + +namespace { + +Aggregation BytesDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, + 67108864, 268435456, 1073741824, 4294967296})); +} + +Aggregation MillisDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, + 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, + 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, + 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000})); +} + +Aggregation CountDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0)); +} + +ViewDescriptor MinuteDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)), + &descriptor); + return descriptor; +} + +ViewDescriptor HourDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)), + &descriptor); + return descriptor; +} + +} // namespace + +void RegisterGrpcViewsForExport() { + ClientSentMessagesPerRpcCumulative().RegisterForExport(); + ClientSentBytesPerRpcCumulative().RegisterForExport(); + ClientReceivedMessagesPerRpcCumulative().RegisterForExport(); + ClientReceivedBytesPerRpcCumulative().RegisterForExport(); + ClientRoundtripLatencyCumulative().RegisterForExport(); + ClientServerLatencyCumulative().RegisterForExport(); + + ServerSentMessagesPerRpcCumulative().RegisterForExport(); + ServerSentBytesPerRpcCumulative().RegisterForExport(); + ServerReceivedMessagesPerRpcCumulative().RegisterForExport(); + ServerReceivedBytesPerRpcCumulative().RegisterForExport(); + ServerServerLatencyCumulative().RegisterForExport(); +} + +// client cumulative +const ViewDescriptor& ClientSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/cumulative") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/roundtrip_latency/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/server_latency/cumulative") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/completed_rpcs/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/cumulative") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/cumulative") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server cumulative +const ViewDescriptor& ServerSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/cumulative") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/elapsed_time/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/completed_rpcs/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/cumulative") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/cumulative") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client minute +const ViewDescriptor& ClientSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/minute") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/minute") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/roundtrip_latency/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/server_latency/minute") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/completed_rpcs/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/minute") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/minute") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server minute +const ViewDescriptor& ServerSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/minute") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/minute") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/server_latency/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/completed_rpcs/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/minute") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/minute") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client hour +const ViewDescriptor& ClientSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/hour") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/hour") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/roundtrip_latency/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/server_latency/hour") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/completed_rpcs/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/hour") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/hour") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server hour +const ViewDescriptor& ServerSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/hour") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/hour") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/server_latency/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/completed_rpcs/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/hour") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/hour") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +} // namespace grpc |