aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/ext/filters/census/client_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/ext/filters/census/client_filter.cc')
-rw-r--r--src/cpp/ext/filters/census/client_filter.cc163
1 files changed, 163 insertions, 0 deletions
diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc
new file mode 100644
index 0000000000..293f4b1c25
--- /dev/null
+++ b/src/cpp/ext/filters/census/client_filter.cc
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/client_filter.h"
+
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "opencensus/stats/stats.h"
+#include "src/core/lib/surface/call.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+#include "src/cpp/ext/filters/census/measures.h"
+
+namespace grpc {
+
+constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
+constexpr uint32_t CensusClientCallData::kMaxTagsLen;
+
+namespace {
+
+void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
+ if (b->idx.named.grpc_server_stats_bin != nullptr) {
+ ServerStatsDeserialize(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
+ GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
+ GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
+ elapsed_time);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
+ }
+}
+
+} // namespace
+
+void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusClientCallData* calld =
+ reinterpret_cast<CensusClientCallData*>(elem->call_data);
+ GPR_ASSERT(calld != nullptr);
+ if (error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
+ FilterTrailingMetadata(calld->recv_trailing_metadata_,
+ &calld->elapsed_time_);
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
+ GRPC_ERROR_REF(error));
+}
+
+void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusClientCallData* calld =
+ reinterpret_cast<CensusClientCallData*>(elem->call_data);
+ CensusChannelData* channeld =
+ reinterpret_cast<CensusChannelData*>(elem->channel_data);
+ GPR_ASSERT(calld != nullptr);
+ GPR_ASSERT(channeld != nullptr);
+ // Stream messages are no longer valid after receiving trailing metadata.
+ if ((*calld->recv_message_) != nullptr) {
+ calld->recv_message_count_++;
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
+}
+
+void CensusClientCallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, TransportStreamOpBatch* op) {
+ if (op->send_initial_metadata() != nullptr) {
+ census_context* ctxt = op->get_census_context();
+ GenerateClientContext(
+ qualified_method_, &context_,
+ (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
+ size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
+ kMaxTraceContextLen);
+ if (tracing_len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata()->batch(), &tracing_bin_,
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_TRACE_BIN,
+ grpc_slice_from_copied_buffer(tracing_buf_, tracing_len))));
+ }
+ grpc_slice tags = grpc_empty_slice();
+ // TODO: Add in tagging serialization.
+ size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
+ if (encoded_tags_len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata()->batch(), &stats_bin_,
+ grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
+ }
+ }
+
+ if (op->send_message() != nullptr) {
+ ++sent_message_count_;
+ }
+ if (op->recv_message() != nullptr) {
+ recv_message_ = op->op()->payload->recv_message.recv_message;
+ initial_on_done_recv_message_ =
+ op->op()->payload->recv_message.recv_message_ready;
+ op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
+ }
+ if (op->recv_trailing_metadata() != nullptr) {
+ recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
+ initial_on_done_recv_trailing_metadata_ = op->on_complete();
+ op->set_on_complete(&on_done_recv_trailing_metadata_);
+ }
+ // Call next op.
+ grpc_call_next_op(elem, op->op());
+}
+
+grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ path_ = grpc_slice_ref_internal(args->path);
+ start_time_ = absl::Now();
+ method_ = GetMethod(&path_);
+ qualified_method_ = absl::StrCat("Sent.", method_);
+ GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
+ OnDoneRecvTrailingMetadataCb, elem,
+ grpc_schedule_on_exec_ctx);
+ return GRPC_ERROR_NONE;
+}
+
+void CensusClientCallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) {
+ const uint64_t request_size = GetOutgoingDataSize(final_info);
+ const uint64_t response_size = GetIncomingDataSize(final_info);
+ double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
+ ::opencensus::stats::Record(
+ {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
+ {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
+ {RpcClientRoundtripLatency(), latency_ms},
+ {RpcClientServerLatency(),
+ ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
+ {RpcClientSentMessagesPerRpc(), sent_message_count_},
+ {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
+ {{ClientMethodTagKey(), method_},
+ {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}});
+ grpc_slice_unref_internal(path_);
+ context_.EndSpan();
+}
+
+} // namespace grpc