From 41e4cedb7012a55376322b142d74eae5e86b95e3 Mon Sep 17 00:00:00 2001 From: Vizerai Date: Fri, 13 Apr 2018 18:19:21 -0700 Subject: Adding opencensus grpc plugin. Rebasing to merge commits. --- src/cpp/ext/filters/census/client_filter.cc | 163 ++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 src/cpp/ext/filters/census/client_filter.cc (limited to 'src/cpp/ext/filters/census/client_filter.cc') diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc new file mode 100644 index 0000000000..293f4b1c25 --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -0,0 +1,163 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/client_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusClientCallData::kMaxTraceContextLen; +constexpr uint32_t CensusClientCallData::kMaxTagsLen; + +namespace { + +void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { + if (b->idx.named.grpc_server_stats_bin != nullptr) { + ServerStatsDeserialize( + reinterpret_cast(GRPC_SLICE_START_PTR( + GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)), + elapsed_time); + grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin); + } +} + +} // namespace + +void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusClientCallData* calld = + reinterpret_cast(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr); + FilterTrailingMetadata(calld->recv_trailing_metadata_, + &calld->elapsed_time_); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusClientCallData* calld = + reinterpret_cast(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + calld->recv_message_count_++; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->send_initial_metadata() != nullptr) { + census_context* ctxt = op->get_census_context(); + GenerateClientContext( + qualified_method_, &context_, + (ctxt == nullptr) ? nullptr : reinterpret_cast(ctxt)); + size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_, + kMaxTraceContextLen); + if (tracing_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &tracing_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_TRACE_BIN, + grpc_slice_from_copied_buffer(tracing_buf_, tracing_len)))); + } + grpc_slice tags = grpc_empty_slice(); + // TODO: Add in tagging serialization. + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &stats_bin_, + grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags))); + } + } + + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + if (op->recv_trailing_metadata() != nullptr) { + recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); + initial_on_done_recv_trailing_metadata_ = op->on_complete(); + op->set_on_complete(&on_done_recv_trailing_metadata_); + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusClientCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + path_ = grpc_slice_ref_internal(args->path); + start_time_ = absl::Now(); + method_ = GetMethod(&path_); + qualified_method_ = absl::StrCat("Sent.", method_); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_, + OnDoneRecvTrailingMetadataCb, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +void CensusClientCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_); + ::opencensus::stats::Record( + {{RpcClientSentBytesPerRpc(), static_cast(request_size)}, + {RpcClientReceivedBytesPerRpc(), static_cast(response_size)}, + {RpcClientRoundtripLatency(), latency_ms}, + {RpcClientServerLatency(), + ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}, + {RpcClientSentMessagesPerRpc(), sent_message_count_}, + {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, + {{ClientMethodTagKey(), method_}, + {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc -- cgit v1.2.3