aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-06-21 22:44:05 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-06-22 10:00:48 -0700
commitf5f1d57d7abdd68c9037576729e3e85437aaa833 (patch)
tree9437b97cf60393bf853807ab1e347c1561affa1a /src
parent7a2a8ca4ba2cd505961ac43c656cc0a7a33c7bb0 (diff)
Add load reporting filter
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/load_reporting/registered_opencensus_objects.h108
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc430
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h99
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc71
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.h61
-rw-r--r--src/core/plugin_registry/grpc_cronet_plugin_registry.cc4
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.cc4
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.cc4
-rw-r--r--src/cpp/common/channel_filter.h12
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
10 files changed, 486 insertions, 309 deletions
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
new file mode 100644
index 0000000000..b62863dc2c
--- /dev/null
+++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
@@ -0,0 +1,108 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H
+
+#include <grpc/support/port_platform.h>
+
+#include "opencensus/stats/stats.h"
+
+#include "src/cpp/server/load_reporter/constants.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Measures.
+
+::opencensus::stats::MeasureInt64 MeasureStartCount() {
+ static const ::opencensus::stats::MeasureInt64 start_count =
+ ::opencensus::stats::MeasureInt64::Register(
+ kMeasureStartCount, kMeasureStartCount, kMeasureStartCount);
+ return start_count;
+}
+
+::opencensus::stats::MeasureInt64 MeasureEndCount() {
+ static const ::opencensus::stats::MeasureInt64 end_count =
+ ::opencensus::stats::MeasureInt64::Register(
+ kMeasureEndCount, kMeasureEndCount, kMeasureEndCount);
+ return end_count;
+}
+
+::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
+ static const ::opencensus::stats::MeasureInt64 end_bytes_sent =
+ ::opencensus::stats::MeasureInt64::Register(
+ kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent);
+ return end_bytes_sent;
+}
+
+::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
+ static const ::opencensus::stats::MeasureInt64 end_bytes_received =
+ ::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived,
+ kMeasureEndBytesReceived,
+ kMeasureEndBytesReceived);
+ return end_bytes_received;
+}
+
+::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
+ static const ::opencensus::stats::MeasureInt64 end_latency_ms =
+ ::opencensus::stats::MeasureInt64::Register(
+ kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs);
+ return end_latency_ms;
+}
+
+::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
+ static const ::opencensus::stats::MeasureDouble other_call_metric =
+ ::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric,
+ kMeasureOtherCallMetric,
+ kMeasureOtherCallMetric);
+ return other_call_metric;
+}
+
+// Tags.
+
+opencensus::stats::TagKey TagKeyToken() {
+ static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken);
+ return token;
+}
+
+opencensus::stats::TagKey TagKeyHost() {
+ static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost);
+ return token;
+}
+opencensus::stats::TagKey TagKeyUserId() {
+ static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId);
+ return token;
+}
+
+opencensus::stats::TagKey TagKeyStatus() {
+ static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus);
+ return token;
+}
+
+opencensus::stats::TagKey TagKeyMetricName() {
+ static const auto token =
+ opencensus::stats::TagKey::Register(kTagKeyMetricName);
+ return token;
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H \
+ */
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index a8f70333b2..51e4d795d7 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -18,203 +18,311 @@
#include <grpc/support/port_platform.h>
-#include <string.h>
-
+#include <grpc/grpc_security.h>
#include <grpc/load_reporting.h>
+#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
-#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/context.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr_posix.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-namespace {
-struct call_data {
- intptr_t id; /**< an id unique to the call */
- bool have_trailing_md_string;
- grpc_slice trailing_md_string;
- bool have_initial_md_string;
- grpc_slice initial_md_string;
- bool have_service_method;
- grpc_slice service_method;
-
- /* stores the recv_initial_metadata op's ready closure, which we wrap with our
- * own (on_initial_md_ready) in order to capture the incoming initial metadata
- * */
- grpc_closure* ops_recv_initial_metadata_ready;
-
- /* to get notified of the availability of the incoming initial metadata. */
- grpc_closure on_initial_md_ready;
- grpc_metadata_batch* recv_initial_metadata;
-};
+namespace grpc {
-struct channel_data {
- intptr_t id; /**< an id unique to the channel */
-};
-} // namespace
-
-static void on_initial_md_ready(void* user_data, grpc_error* err) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
-
- if (err == GRPC_ERROR_NONE) {
- if (calld->recv_initial_metadata->idx.named.path != nullptr) {
- calld->service_method = grpc_slice_ref_internal(
- GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
- calld->have_service_method = true;
- } else {
- err = grpc_error_add_child(
- err, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing :path header"));
- }
- if (calld->recv_initial_metadata->idx.named.lb_token != nullptr) {
- calld->initial_md_string = grpc_slice_ref_internal(
- GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.lb_token->md));
- calld->have_initial_md_string = true;
- grpc_metadata_batch_remove(
- calld->recv_initial_metadata,
- calld->recv_initial_metadata->idx.named.lb_token);
+grpc_error* ServerLoadReportingChannelData::Init(
+ grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+ // Find and record the peer_identity.
+ const grpc_auth_context* auth_context =
+ grpc_find_auth_context_in_args(args->channel_args);
+ if (auth_context != nullptr &&
+ grpc_auth_context_peer_is_authenticated(auth_context)) {
+ grpc_auth_property_iterator auth_it =
+ grpc_auth_context_peer_identity(auth_context);
+ const grpc_auth_property* auth_property =
+ grpc_auth_property_iterator_next(&auth_it);
+ if (auth_property != nullptr) {
+ peer_identity_ = auth_property->value;
+ peer_identity_len_ = auth_property->value_length;
}
- } else {
- GRPC_ERROR_REF(err);
}
- GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err);
-}
-
-/* Constructor for call_data */
-static grpc_error* init_call_elem(grpc_call_element* elem,
- const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->id = (intptr_t)args->call_stack;
- GRPC_CLOSURE_INIT(&calld->on_initial_md_ready, on_initial_md_ready, elem,
- grpc_schedule_on_exec_ctx);
-
- /* TODO(dgq): do something with the data
- channel_data *chand = elem->channel_data;
- grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
- (intptr_t)chand->id,
- (intptr_t)calld->id,
- NULL,
- NULL,
- NULL,
- NULL};
- */
-
return GRPC_ERROR_NONE;
}
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element* elem,
- const grpc_call_final_info* final_info,
- grpc_closure* ignored) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void ServerLoadReportingCallData::Destroy(
+ grpc_call_element* elem, const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) {
+ ServerLoadReportingChannelData* chand =
+ reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+ // Only record an end if we've recorded its corresponding start, which is
+ // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
+ // that we attempt to record the call end before we have recorded the call
+ // start, because the data needed for recording the start comes from the
+ // initial metadata, which may not be ready before the call finishes.
+ if (client_ip_and_lr_token_ != nullptr) {
+ opencensus::stats::Record(
+ {{::grpc::load_reporter::MeasureEndCount(), 1},
+ {::grpc::load_reporter::MeasureEndBytesSent(),
+ final_info->stats.transport_stream_stats.outgoing.data_bytes},
+ {::grpc::load_reporter::MeasureEndBytesReceived(),
+ final_info->stats.transport_stream_stats.incoming.data_bytes},
+ {::grpc::load_reporter::MeasureEndLatencyMs(),
+ gpr_time_to_millis(final_info->stats.latency)}},
+ {{::grpc::load_reporter::TagKeyToken(),
+ {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
+ {::grpc::load_reporter::TagKeyHost(),
+ {target_host_, target_host_len_}},
+ {::grpc::load_reporter::TagKeyUserId(),
+ {chand->peer_identity(), chand->peer_identity_len()}},
+ {::grpc::load_reporter::TagKeyStatus(),
+ GetStatusTagForStatus(final_info->final_status)}});
+ }
+ grpc_slice_unref_internal(service_method_);
+}
- /* TODO(dgq): do something with the data
- channel_data *chand = elem->channel_data;
- grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
- (intptr_t)chand->id,
- (intptr_t)calld->id,
- final_info,
- calld->initial_md_string,
- calld->trailing_md_string,
- calld->service_method};
- */
+void ServerLoadReportingCallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, TransportStreamOpBatch* op) {
+ GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
+ if (op->recv_initial_metadata() != nullptr) {
+ // Save some fields to use when initial metadata is ready.
+ peer_string_ = op->get_peer_string();
+ recv_initial_metadata_ = op->recv_initial_metadata();
+ original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
+ // Substitute the original closure for the wrapper closure.
+ op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
+ } else if (op->send_trailing_metadata() != nullptr) {
+ GRPC_LOG_IF_ERROR(
+ "server_load_reporting_filter",
+ grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
+ SendTrailingMetadataFilter, elem,
+ "send_trailing_metadata filtering error"));
+ }
+ grpc_call_next_op(elem, op->op());
+}
- if (calld->have_initial_md_string) {
- grpc_slice_unref_internal(calld->initial_md_string);
+void ServerLoadReportingCallData::GetCensusSafeClientIpString(
+ char** client_ip_string, size_t* size) {
+ // Find the client URI string.
+ const char* client_uri_str =
+ reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
+ if (client_uri_str == nullptr) {
+ gpr_log(GPR_ERROR,
+ "Unable to extract client URI string (peer string) from gRPC "
+ "metadata.");
+ *client_ip_string = nullptr;
+ *size = 0;
+ return;
}
- if (calld->have_trailing_md_string) {
- grpc_slice_unref_internal(calld->trailing_md_string);
+ // Parse the client URI string into grpc_uri.
+ grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
+ if (client_uri == nullptr) {
+ gpr_log(GPR_ERROR,
+ "Unable to parse the client URI string (peer string) to a client "
+ "URI.");
+ *client_ip_string = nullptr;
+ *size = 0;
+ return;
}
- if (calld->have_service_method) {
- grpc_slice_unref_internal(calld->service_method);
+ // Parse the client URI into grpc_resolved_address.
+ grpc_resolved_address resolved_address;
+ bool success = grpc_parse_uri(client_uri, &resolved_address);
+ grpc_uri_destroy(client_uri);
+ if (!success) {
+ gpr_log(GPR_ERROR,
+ "Unable to parse client URI into a grpc_resolved_address.");
+ *client_ip_string = nullptr;
+ *size = 0;
+ return;
+ }
+ // Convert the socket address in the grpc_resolved_address into a hex string
+ // according to the address family.
+ grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
+ if (addr->sa_family == GRPC_AF_INET) {
+ grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
+ gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
+ *size = 8;
+ } else if (addr->sa_family == GRPC_AF_INET6) {
+ grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
+ *client_ip_string = static_cast<char*>(gpr_malloc(32));
+ for (size_t i = 0; i < 16; ++i) {
+ sprintf(*client_ip_string + i, "%02x",
+ addr6->sin6_addr.__in6_u.__u6_addr8[i]);
+ }
+ *size = 32;
+ } else {
+ GPR_UNREACHABLE_CODE();
}
}
-/* Constructor for channel_data */
-static grpc_error* init_channel_elem(grpc_channel_element* elem,
- grpc_channel_element_args* args) {
- GPR_ASSERT(!args->is_last);
-
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- chand->id = (intptr_t)args->channel_stack;
+void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
+ size_t lr_token_len) {
+ char* client_ip;
+ size_t client_ip_len;
+ GetCensusSafeClientIpString(&client_ip, &client_ip_len);
+ client_ip_and_lr_token_len_ =
+ kLengthPrefixSize + client_ip_len + lr_token_len;
+ client_ip_and_lr_token_ = static_cast<char*>(
+ gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
+ char* cur_pos = client_ip_and_lr_token_;
+ // Store the IP length prefix.
+ if (client_ip_len == 0) {
+ strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
+ } else if (client_ip_len == 8) {
+ strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
+ } else if (client_ip_len == 32) {
+ strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
+ } else {
+ GPR_UNREACHABLE_CODE();
+ }
+ cur_pos += kLengthPrefixSize;
+ // Store the IP.
+ if (client_ip_len != 0) {
+ strncpy(cur_pos, client_ip, client_ip_len);
+ }
+ gpr_free(client_ip);
+ cur_pos += client_ip_len;
+ // Store the LR token.
+ if (lr_token_len != 0) {
+ strncpy(cur_pos, lr_token, lr_token_len);
+ }
+ GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
+ client_ip_and_lr_token_len_);
+}
- /* TODO(dgq): do something with the data
- grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
- (intptr_t)chand,
- 0,
- NULL,
- NULL,
- NULL,
- NULL};
- */
+grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
+ void* user_data, grpc_mdelem md) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ ServerLoadReportingCallData* calld =
+ reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
+ calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
+ } else if (calld->target_host_ == nullptr &&
+ grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
+ grpc_slice target_host_slice = GRPC_MDVALUE(md);
+ calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
+ calld->target_host_ =
+ reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
+ for (size_t i = 0; i < calld->target_host_len_; ++i) {
+ calld->target_host_[i] = static_cast<char>(
+ tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
+ }
+ } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_TOKEN)) {
+ if (calld->client_ip_and_lr_token_ == nullptr) {
+ calld->StoreClientIpAndLrToken(
+ reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
+ GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
+ }
+ return GRPC_FILTERED_REMOVE();
+ }
+ return GRPC_FILTERED_MDELEM(md);
+}
- return GRPC_ERROR_NONE;
+void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
+ grpc_error* err) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
+ ServerLoadReportingCallData* calld =
+ reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+ ServerLoadReportingChannelData* chand =
+ reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+ if (err == GRPC_ERROR_NONE) {
+ GRPC_LOG_IF_ERROR(
+ "server_load_reporting_filter",
+ grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
+ RecvInitialMetadataFilter, elem,
+ "recv_initial_metadata filtering error"));
+ // If the LB token was not found in the recv_initial_metadata, only the
+ // client IP part will be recorded (with an empty LB token).
+ if (calld->client_ip_and_lr_token_ == nullptr) {
+ calld->StoreClientIpAndLrToken(nullptr, 0);
+ }
+ opencensus::stats::Record(
+ {{::grpc::load_reporter::MeasureStartCount(), 1}},
+ {{::grpc::load_reporter::TagKeyToken(),
+ {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
+ {::grpc::load_reporter::TagKeyHost(),
+ {calld->target_host_, calld->target_host_len_}},
+ {::grpc::load_reporter::TagKeyUserId(),
+ {chand->peer_identity(), chand->peer_identity_len()}}});
+ }
+ GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
+ GRPC_ERROR_REF(err));
}
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element* elem) {
- /* TODO(dgq): do something with the data
- channel_data *chand = elem->channel_data;
- grpc_load_reporting_call_data lr_call_data = {
- GRPC_LR_POINT_CHANNEL_DESTRUCTION,
- (intptr_t)chand->id,
- 0,
- NULL,
- NULL,
- NULL,
- NULL};
- */
+grpc_error* ServerLoadReportingCallData::Init(
+ grpc_call_element* elem, const grpc_call_element_args* args) {
+ service_method_ = grpc_empty_slice();
+ GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
+ elem, grpc_schedule_on_exec_ctx);
+ return GRPC_ERROR_NONE;
}
-static grpc_filtered_mdelem lr_trailing_md_filter(void* user_data,
- grpc_mdelem md) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
+ void* user_data, grpc_mdelem md) {
+ grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+ ServerLoadReportingCallData* calld =
+ reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+ ServerLoadReportingChannelData* chand =
+ reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+ // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
- calld->trailing_md_string = GRPC_MDVALUE(md);
+ const grpc_slice value = GRPC_MDVALUE(md);
+ const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
+ if (cost_entry_size < sizeof(double)) {
+ gpr_log(GPR_ERROR,
+ "Cost metadata value too small (%zu bytes) to hold valid data. "
+ "Ignoring.",
+ cost_entry_size);
+ return GRPC_FILTERED_REMOVE();
+ }
+ const double* cost_entry_ptr =
+ reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
+ double cost_value = *cost_entry_ptr++;
+ const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
+ const size_t cost_name_len = cost_entry_size - sizeof(double);
+ opencensus::stats::Record(
+ {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
+ {{::grpc::load_reporter::TagKeyToken(),
+ {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
+ {::grpc::load_reporter::TagKeyHost(),
+ {calld->target_host_, calld->target_host_len_}},
+ {::grpc::load_reporter::TagKeyUserId(),
+ {chand->peer_identity(), chand->peer_identity_len()}},
+ {::grpc::load_reporter::TagKeyMetricName(),
+ {cost_name, cost_name_len}}});
return GRPC_FILTERED_REMOVE();
}
return GRPC_FILTERED_MDELEM(md);
}
-static void lr_start_transport_stream_op_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
- GPR_TIMER_SCOPE("lr_start_transport_stream_op_batch", 0);
- call_data* calld = static_cast<call_data*>(elem->call_data);
-
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->ops_recv_initial_metadata_ready =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->on_initial_md_ready;
- } else if (op->send_trailing_metadata) {
- GRPC_LOG_IF_ERROR(
- "grpc_metadata_batch_filter",
- grpc_metadata_batch_filter(
- op->payload->send_trailing_metadata.send_trailing_metadata,
- lr_trailing_md_filter, elem,
- "LR trailing metadata filtering error"));
+const char* ServerLoadReportingCallData::GetStatusTagForStatus(
+ grpc_status_code status) {
+ switch (status) {
+ case GRPC_STATUS_OK:
+ return ::grpc::load_reporter::kCallStatusOk;
+ case GRPC_STATUS_UNKNOWN:
+ case GRPC_STATUS_DEADLINE_EXCEEDED:
+ case GRPC_STATUS_UNIMPLEMENTED:
+ case GRPC_STATUS_INTERNAL:
+ case GRPC_STATUS_UNAVAILABLE:
+ case GRPC_STATUS_DATA_LOSS:
+ return ::grpc::load_reporter::kCallStatusServerError;
+ default:
+ return ::grpc::load_reporter::kCallStatusClientError;
}
- grpc_call_next_op(elem, op);
}
-const grpc_channel_filter grpc_server_load_reporting_filter = {
- lr_start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_channel_next_get_info,
- "load_reporting"};
+} // namespace grpc
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index b459a8ec5f..029b19ac89 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -21,10 +21,105 @@
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/cpp/common/channel_filter.h"
-extern const grpc_channel_filter grpc_server_load_reporting_filter;
+namespace grpc {
+
+class ServerLoadReportingChannelData : public ChannelData {
+ public:
+ grpc_error* Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args) override;
+
+ // Getters.
+ const char* peer_identity() { return peer_identity_; }
+ size_t peer_identity_len() { return peer_identity_len_; }
+
+ private:
+ // The peer's authenticated identity.
+ char* peer_identity_ = nullptr;
+ size_t peer_identity_len_ = 0;
+};
+
+class ServerLoadReportingCallData : public CallData {
+ public:
+ grpc_error* Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) override;
+
+ void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
+ grpc_closure* then_call_closure) override;
+
+ void StartTransportStreamOpBatch(grpc_call_element* elem,
+ TransportStreamOpBatch* op) override;
+
+ private:
+ // From the peer_string_ in calld, extracts the client IP string (owned by
+ // caller), e.g., "01020a0b". Upon failure, set the output pointer to null and
+ // size to zero.
+ void GetCensusSafeClientIpString(char** client_ip_string, size_t* size);
+
+ // Concatenates the client IP address and the load reporting token, then
+ // stores the result into the call data.
+ void StoreClientIpAndLrToken(const char* lr_token, size_t lr_token_len);
+
+ // This matches the classification of the status codes in
+ // googleapis/google/rpc/code.proto.
+ static const char* GetStatusTagForStatus(grpc_status_code status);
+
+ // Records the call start.
+ static void RecvInitialMetadataReady(void* arg, grpc_error* err);
+
+ // From the initial metadata, extracts the service_method_, target_host_, and
+ // client_ip_and_lr_token_.
+ static grpc_filtered_mdelem RecvInitialMetadataFilter(void* user_data,
+ grpc_mdelem md);
+
+ // Records the other call metrics.
+ static grpc_filtered_mdelem SendTrailingMetadataFilter(void* user_data,
+ grpc_mdelem md);
+
+ // The peer string (a member of the recv_initial_metadata op). Note that
+ // gpr_atm itself is a pointer type here, making "peer_string_" effectively a
+ // double pointer.
+ const gpr_atm* peer_string_;
+
+ // The received initial metadata (a member of the recv_initial_metadata op).
+ // When it is ready, we will extract some data from it via
+ // recv_initial_metadata_ready_ closure, before the original
+ // recv_initial_metadata_ready closure,
+ MetadataBatch* recv_initial_metadata_;
+
+ // The original recv_initial_metadata closure, which is wrapped by our own
+ // closure (recv_initial_metadata_ready_) to capture the incoming initial
+ // metadata.
+ grpc_closure* original_recv_initial_metadata_ready_;
+
+ // The closure that wraps the original closure. Scheduled when
+ // recv_initial_metadata_ is ready.
+ grpc_closure recv_initial_metadata_ready_;
+
+ // Corresponds to the :path header.
+ grpc_slice service_method_;
+
+ // The backend host that the client thinks it's talking to. This may be
+ // different from the actual backend in the case of, for example,
+ // load-balanced targets. We store a copy of the metadata slice in order to
+ // lowercase it. */
+ char* target_host_;
+ size_t target_host_len_;
+
+ // The client IP address (including a length prefix) and the load reporting
+ // token.
+ char* client_ip_and_lr_token_;
+ size_t client_ip_and_lr_token_len_;
+
+ static constexpr char kEncodedIpv4AddressLengthString[] = "08";
+ static constexpr char kEncodedIpv6AddressLengthString[] = "32";
+ static constexpr char kEmptyAddressLengthString[] = "00";
+ static constexpr size_t kLengthPrefixSize = 2;
+};
+
+} // namespace grpc
#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \
*/
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc
deleted file mode 100644
index 667c0c56ef..0000000000
--- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <limits.h>
-#include <string.h>
-
-#include <grpc/load_reporting.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/sync.h>
-
-#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
-#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
-#include "src/core/lib/channel/channel_stack_builder.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/surface/call.h"
-#include "src/core/lib/surface/channel_init.h"
-
-static bool is_load_reporting_enabled(const grpc_channel_args* a) {
- return grpc_channel_arg_get_bool(
- grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
-}
-
-static bool maybe_add_server_load_reporting_filter(
- grpc_channel_stack_builder* builder, void* arg) {
- const grpc_channel_args* args =
- grpc_channel_stack_builder_get_channel_arguments(builder);
- const grpc_channel_filter* filter =
- static_cast<const grpc_channel_filter*>(arg);
- grpc_channel_stack_builder_iterator* it =
- grpc_channel_stack_builder_iterator_find(builder, filter->name);
- const bool already_has_load_reporting_filter =
- !grpc_channel_stack_builder_iterator_is_end(it);
- grpc_channel_stack_builder_iterator_destroy(it);
- if (is_load_reporting_enabled(args) && !already_has_load_reporting_filter) {
- return grpc_channel_stack_builder_prepend_filter(builder, filter, nullptr,
- nullptr);
- }
- return true;
-}
-
-grpc_arg grpc_load_reporting_enable_arg() {
- return grpc_channel_arg_integer_create((char*)GRPC_ARG_ENABLE_LOAD_REPORTING,
- 1);
-}
-
-/* Plugin registration */
-
-void grpc_server_load_reporting_plugin_init(void) {
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_server_load_reporting_filter,
- (void*)&grpc_server_load_reporting_filter);
-}
-
-void grpc_server_load_reporting_plugin_shutdown() {}
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
deleted file mode 100644
index c20aaa744f..0000000000
--- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
-#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
-
-#include <grpc/support/port_platform.h>
-
-#include <grpc/impl/codegen/grpc_types.h>
-
-#include "src/core/lib/channel/channel_stack.h"
-
-/** Identifiers for the invocation point of the users LR callback */
-typedef enum grpc_load_reporting_source {
- GRPC_LR_POINT_UNKNOWN = 0,
- GRPC_LR_POINT_CHANNEL_CREATION,
- GRPC_LR_POINT_CHANNEL_DESTRUCTION,
- GRPC_LR_POINT_CALL_CREATION,
- GRPC_LR_POINT_CALL_DESTRUCTION
-} grpc_load_reporting_source;
-
-/** Call information to be passed to the provided LR callback. */
-typedef struct grpc_load_reporting_call_data {
- const grpc_load_reporting_source source; /**< point of last data update. */
-
- /** Unique identifier for the channel associated with the data */
- intptr_t channel_id;
-
- /** Unique identifier for the call associated with the data. If the call
- * hasn't been created yet, it'll have a value of zero. */
- intptr_t call_id;
-
- /** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is,
- * once the call has completed */
- const grpc_call_final_info* final_info;
-
- const char* initial_md_string; /**< value string for LR's initial md key */
- const char* trailing_md_string; /**< value string for LR's trailing md key */
- const char* method_name; /**< Corresponds to :path header */
-} grpc_load_reporting_call_data;
-
-/** Return a \a grpc_arg enabling load reporting */
-grpc_arg grpc_load_reporting_enable_arg();
-
-#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \
- */
diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc
index 49b9c7d4fe..c0c17b0a4b 100644
--- a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc
@@ -30,8 +30,6 @@ void grpc_client_channel_init(void);
void grpc_client_channel_shutdown(void);
void grpc_tsi_alts_init(void);
void grpc_tsi_alts_shutdown(void);
-void grpc_server_load_reporting_plugin_init(void);
-void grpc_server_load_reporting_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_http_filters_init,
@@ -44,6 +42,4 @@ void grpc_register_built_in_plugins(void) {
grpc_client_channel_shutdown);
grpc_register_plugin(grpc_tsi_alts_init,
grpc_tsi_alts_shutdown);
- grpc_register_plugin(grpc_server_load_reporting_plugin_init,
- grpc_server_load_reporting_plugin_shutdown);
}
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index e371310fa1..fb523a173d 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -46,8 +46,6 @@ void grpc_resolver_dns_native_init(void);
void grpc_resolver_dns_native_shutdown(void);
void grpc_resolver_sockaddr_init(void);
void grpc_resolver_sockaddr_shutdown(void);
-void grpc_server_load_reporting_plugin_init(void);
-void grpc_server_load_reporting_plugin_shutdown(void);
void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void);
@@ -84,8 +82,6 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
- grpc_register_plugin(grpc_server_load_reporting_plugin_init,
- grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
index 283db5b4f4..80214aebe2 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
@@ -38,8 +38,6 @@ void grpc_resolver_sockaddr_init(void);
void grpc_resolver_sockaddr_shutdown(void);
void grpc_resolver_fake_init(void);
void grpc_resolver_fake_shutdown(void);
-void grpc_server_load_reporting_plugin_init(void);
-void grpc_server_load_reporting_plugin_shutdown(void);
void grpc_lb_policy_grpclb_init(void);
void grpc_lb_policy_grpclb_shutdown(void);
void grpc_lb_policy_pick_first_init(void);
@@ -74,8 +72,6 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_resolver_fake_init,
grpc_resolver_fake_shutdown);
- grpc_register_plugin(grpc_server_load_reporting_plugin_init,
- grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index bd0ec969b4..5e569c97e6 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -207,6 +207,18 @@ class TransportStreamOpBatch {
op_->payload->context[GRPC_CONTEXT_TRACING].value);
}
+ const gpr_atm* get_peer_string() const {
+ if (op_->send_initial_metadata &&
+ op_->payload->send_initial_metadata.peer_string != nullptr) {
+ return op_->payload->send_initial_metadata.peer_string;
+ } else if (op_->recv_initial_metadata &&
+ op_->payload->recv_initial_metadata.peer_string != nullptr) {
+ return op_->payload->recv_initial_metadata.peer_string;
+ } else {
+ return nullptr;
+ }
+ }
+
private:
grpc_transport_stream_op_batch* op_; // Not owned.
MetadataBatch send_initial_metadata_;
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index b20b8155a0..f48e0f5cb6 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -352,8 +352,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
- 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc',
- 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc',
'src/cpp/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',