aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/ext/filters/census/server_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/ext/filters/census/server_filter.cc')
-rw-r--r--src/cpp/ext/filters/census/server_filter.cc198
1 files changed, 198 insertions, 0 deletions
diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc
new file mode 100644
index 0000000000..c7c62eefe5
--- /dev/null
+++ b/src/cpp/ext/filters/census/server_filter.cc
@@ -0,0 +1,198 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/ext/filters/census/server_filter.h"
+
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "opencensus/stats/stats.h"
+#include "src/core/lib/surface/call.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+#include "src/cpp/ext/filters/census/measures.h"
+
+namespace grpc {
+
+constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
+
+namespace {
+
+// server metadata elements
+struct ServerMetadataElements {
+ grpc_slice path;
+ grpc_slice tracing_slice;
+ grpc_slice census_proto;
+};
+
+void FilterInitialMetadata(grpc_metadata_batch* b,
+ ServerMetadataElements* sml) {
+ if (b->idx.named.path != nullptr) {
+ sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
+ }
+ if (b->idx.named.grpc_trace_bin != nullptr) {
+ sml->tracing_slice =
+ grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
+ }
+ if (b->idx.named.grpc_tags_bin != nullptr) {
+ sml->census_proto =
+ grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
+ }
+}
+
+} // namespace
+
+void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusServerCallData* calld =
+ reinterpret_cast<CensusServerCallData*>(elem->call_data);
+ CensusChannelData* channeld =
+ reinterpret_cast<CensusChannelData*>(elem->channel_data);
+ GPR_ASSERT(calld != nullptr);
+ GPR_ASSERT(channeld != nullptr);
+ // Stream messages are no longer valid after receiving trailing metadata.
+ if ((*calld->recv_message_) != nullptr) {
+ ++calld->recv_message_count_;
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
+}
+
+void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ CensusServerCallData* calld =
+ reinterpret_cast<CensusServerCallData*>(elem->call_data);
+ GPR_ASSERT(calld != nullptr);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
+ GPR_ASSERT(initial_metadata != nullptr);
+ ServerMetadataElements sml;
+ sml.path = grpc_empty_slice();
+ sml.tracing_slice = grpc_empty_slice();
+ sml.census_proto = grpc_empty_slice();
+ FilterInitialMetadata(initial_metadata, &sml);
+ calld->path_ = grpc_slice_ref_internal(sml.path);
+ calld->method_ = GetMethod(&calld->path_);
+ calld->qualified_method_ = StrCat("Recv.", calld->method_);
+ const char* tracing_str =
+ GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
+ ? ""
+ : reinterpret_cast<const char*>(
+ GRPC_SLICE_START_PTR(sml.tracing_slice));
+ size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
+ ? 0
+ : GRPC_SLICE_LENGTH(sml.tracing_slice);
+ const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
+ ? ""
+ : reinterpret_cast<const char*>(
+ GRPC_SLICE_START_PTR(sml.census_proto));
+ size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
+ ? 0
+ : GRPC_SLICE_LENGTH(sml.census_proto);
+
+ GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
+ absl::string_view(census_str, census_str_len),
+ /*primary_role*/ "", calld->qualified_method_,
+ &calld->context_);
+
+ grpc_slice_unref_internal(sml.tracing_slice);
+ grpc_slice_unref_internal(sml.census_proto);
+ grpc_slice_unref_internal(sml.path);
+ grpc_census_call_set_context(
+ calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
+ }
+ GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
+ GRPC_ERROR_REF(error));
+}
+
+void CensusServerCallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, TransportStreamOpBatch* op) {
+ if (op->recv_initial_metadata() != nullptr) {
+ // substitute our callback for the op callback
+ recv_initial_metadata_ = op->recv_initial_metadata()->batch();
+ initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
+ op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
+ }
+ if (op->send_message() != nullptr) {
+ ++sent_message_count_;
+ }
+ if (op->recv_message() != nullptr) {
+ recv_message_ = op->op()->payload->recv_message.recv_message;
+ initial_on_done_recv_message_ =
+ op->op()->payload->recv_message.recv_message_ready;
+ op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
+ }
+ // We need to record the time when the trailing metadata was sent to mark the
+ // completeness of the request.
+ if (op->send_trailing_metadata() != nullptr) {
+ elapsed_time_ = absl::Now() - start_time_;
+ size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
+ stats_buf_, kMaxServerStatsLen);
+ if (len > 0) {
+ GRPC_LOG_IF_ERROR(
+ "census grpc_filter",
+ grpc_metadata_batch_add_tail(
+ op->send_trailing_metadata()->batch(), &census_bin_,
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
+ grpc_slice_from_copied_buffer(stats_buf_, len))));
+ }
+ }
+ // Call next op.
+ grpc_call_next_op(elem, op->op());
+}
+
+grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ start_time_ = absl::Now();
+ gc_ =
+ grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
+ GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
+ OnDoneRecvInitialMetadataCb, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
+ grpc_schedule_on_exec_ctx);
+ auth_context_ = grpc_call_auth_context(gc_);
+ return GRPC_ERROR_NONE;
+}
+
+void CensusServerCallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) {
+ const uint64_t request_size = GetOutgoingDataSize(final_info);
+ const uint64_t response_size = GetIncomingDataSize(final_info);
+ double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
+ grpc_auth_context_release(auth_context_);
+ ::opencensus::stats::Record(
+ {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
+ {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
+ {RpcServerServerLatency(), elapsed_time_ms},
+ {RpcServerSentMessagesPerRpc(), sent_message_count_},
+ {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
+ {{ServerMethodTagKey(), method_},
+ {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
+ grpc_slice_unref_internal(path_);
+ context_.EndSpan();
+}
+
+} // namespace grpc