diff options
Diffstat (limited to 'src/core/ext/filters/load_reporting')
3 files changed, 83 insertions, 45 deletions
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h index b62863dc2c..4eacda7c02 100644 --- a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h +++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h @@ -28,75 +28,84 @@ namespace grpc { namespace load_reporter { +// Note that the functions here are specified as inline to share the static +// objects across all the translation units including this header. See more +// details on https://en.cppreference.com/w/cpp/language/inline. + // Measures. -::opencensus::stats::MeasureInt64 MeasureStartCount() { - static const ::opencensus::stats::MeasureInt64 start_count = +inline ::opencensus::stats::MeasureInt64 MeasureStartCount() { + static const ::opencensus::stats::MeasureInt64 measure = ::opencensus::stats::MeasureInt64::Register( kMeasureStartCount, kMeasureStartCount, kMeasureStartCount); - return start_count; + return measure; } -::opencensus::stats::MeasureInt64 MeasureEndCount() { - static const ::opencensus::stats::MeasureInt64 end_count = +inline ::opencensus::stats::MeasureInt64 MeasureEndCount() { + static const ::opencensus::stats::MeasureInt64 measure = ::opencensus::stats::MeasureInt64::Register( kMeasureEndCount, kMeasureEndCount, kMeasureEndCount); - return end_count; + return measure; } -::opencensus::stats::MeasureInt64 MeasureEndBytesSent() { - static const ::opencensus::stats::MeasureInt64 end_bytes_sent = +inline ::opencensus::stats::MeasureInt64 MeasureEndBytesSent() { + static const ::opencensus::stats::MeasureInt64 measure = ::opencensus::stats::MeasureInt64::Register( kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent); - return end_bytes_sent; + return measure; } -::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() { - static const ::opencensus::stats::MeasureInt64 end_bytes_received = +inline ::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() { + static const ::opencensus::stats::MeasureInt64 measure = ::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived, kMeasureEndBytesReceived, kMeasureEndBytesReceived); - return end_bytes_received; + return measure; } -::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() { - static const ::opencensus::stats::MeasureInt64 end_latency_ms = +inline ::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() { + static const ::opencensus::stats::MeasureInt64 measure = ::opencensus::stats::MeasureInt64::Register( kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs); - return end_latency_ms; + return measure; } -::opencensus::stats::MeasureDouble MeasureOtherCallMetric() { - static const ::opencensus::stats::MeasureDouble other_call_metric = +inline ::opencensus::stats::MeasureDouble MeasureOtherCallMetric() { + static const ::opencensus::stats::MeasureDouble measure = ::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric, kMeasureOtherCallMetric, kMeasureOtherCallMetric); - return other_call_metric; + return measure; } // Tags. -opencensus::stats::TagKey TagKeyToken() { - static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken); +inline ::opencensus::stats::TagKey TagKeyToken() { + static const ::opencensus::stats::TagKey token = + opencensus::stats::TagKey::Register(kTagKeyToken); return token; } -opencensus::stats::TagKey TagKeyHost() { - static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost); +inline ::opencensus::stats::TagKey TagKeyHost() { + static const ::opencensus::stats::TagKey token = + opencensus::stats::TagKey::Register(kTagKeyHost); return token; } -opencensus::stats::TagKey TagKeyUserId() { - static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId); + +inline ::opencensus::stats::TagKey TagKeyUserId() { + static const ::opencensus::stats::TagKey token = + opencensus::stats::TagKey::Register(kTagKeyUserId); return token; } -opencensus::stats::TagKey TagKeyStatus() { - static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus); +inline ::opencensus::stats::TagKey TagKeyStatus() { + static const ::opencensus::stats::TagKey token = + opencensus::stats::TagKey::Register(kTagKeyStatus); return token; } -opencensus::stats::TagKey TagKeyMetricName() { - static const auto token = +inline ::opencensus::stats::TagKey TagKeyMetricName() { + static const ::opencensus::stats::TagKey token = opencensus::stats::TagKey::Register(kTagKeyMetricName); return token; } diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index 51e4d795d7..6529046a5e 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -19,7 +19,6 @@ #include <grpc/support/port_platform.h> #include <grpc/grpc_security.h> -#include <grpc/load_reporting.h> #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -31,18 +30,20 @@ #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/context.h" -#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_posix.h" #include "src/core/lib/iomgr/socket_utils.h" -#include "src/core/lib/profiling/timers.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" -#include "src/core/lib/transport/static_metadata.h" namespace grpc { +constexpr char kEncodedIpv4AddressLengthString[] = "08"; +constexpr char kEncodedIpv6AddressLengthString[] = "32"; +constexpr char kEmptyAddressLengthString[] = "00"; +constexpr size_t kLengthPrefixSize = 2; + grpc_error* ServerLoadReportingChannelData::Init( grpc_channel_element* /* elem */, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); @@ -90,7 +91,9 @@ void ServerLoadReportingCallData::Destroy( {chand->peer_identity(), chand->peer_identity_len()}}, {::grpc::load_reporter::TagKeyStatus(), GetStatusTagForStatus(final_info->final_status)}}); + gpr_free(client_ip_and_lr_token_); } + gpr_free(target_host_); grpc_slice_unref_internal(service_method_); } @@ -100,7 +103,8 @@ void ServerLoadReportingCallData::StartTransportStreamOpBatch( if (op->recv_initial_metadata() != nullptr) { // Save some fields to use when initial metadata is ready. peer_string_ = op->get_peer_string(); - recv_initial_metadata_ = op->recv_initial_metadata(); + recv_initial_metadata_ = + op->op()->payload->recv_initial_metadata.recv_initial_metadata; original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready(); // Substitute the original closure for the wrapper closure. op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_); @@ -157,10 +161,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString( *size = 8; } else if (addr->sa_family == GRPC_AF_INET6) { grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr); - *client_ip_string = static_cast<char*>(gpr_malloc(32)); + *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1)); for (size_t i = 0; i < 16; ++i) { - sprintf(*client_ip_string + i, "%02x", - addr6->sin6_addr.__in6_u.__u6_addr8[i]); + snprintf(*client_ip_string + i * 2, 2 + 1, "%02x", + addr6->sin6_addr.__in6_u.__u6_addr8[i]); } *size = 32; } else { @@ -241,7 +245,7 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg, if (err == GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR( "server_load_reporting_filter", - grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(), + grpc_metadata_batch_filter(calld->recv_initial_metadata_, RecvInitialMetadataFilter, elem, "recv_initial_metadata filtering error")); // If the LB token was not found in the recv_initial_metadata, only the @@ -277,7 +281,6 @@ grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter( reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); ServerLoadReportingChannelData* chand = reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); - // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning? if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) { const grpc_slice value = GRPC_MDVALUE(md); const size_t cost_entry_size = GRPC_SLICE_LENGTH(value); @@ -325,4 +328,35 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus( } } +namespace { +bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) { + return grpc_channel_arg_get_bool( + grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false); +} +} // namespace + +// TODO(juanlishen): We should register the filter during grpc initialization +// time once OpenCensus is compatible with our build system. For now, we force +// registration of the server load reporting filter at static initialization +// time if we build with the filter target. +struct ServerLoadReportingFilterStaticRegistrar { + ServerLoadReportingFilterStaticRegistrar() { + static std::atomic_bool registered{false}; + if (registered) return; + RegisterChannelFilter<ServerLoadReportingChannelData, + ServerLoadReportingCallData>( + "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX, + MaybeAddServerLoadReportingFilter); + // Access measures to ensure they are initialized. Otherwise, we can't + // create any valid view before the first RPC. + ::grpc::load_reporter::MeasureStartCount(); + ::grpc::load_reporter::MeasureEndCount(); + ::grpc::load_reporter::MeasureEndBytesSent(); + ::grpc::load_reporter::MeasureEndBytesReceived(); + ::grpc::load_reporter::MeasureEndLatencyMs(); + ::grpc::load_reporter::MeasureOtherCallMetric(); + registered = true; + } +} server_load_reporting_filter_static_registrar; + } // namespace grpc diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h index 029b19ac89..10baf1f833 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h @@ -86,8 +86,8 @@ class ServerLoadReportingCallData : public CallData { // The received initial metadata (a member of the recv_initial_metadata op). // When it is ready, we will extract some data from it via // recv_initial_metadata_ready_ closure, before the original - // recv_initial_metadata_ready closure, - MetadataBatch* recv_initial_metadata_; + // recv_initial_metadata_ready closure. + grpc_metadata_batch* recv_initial_metadata_; // The original recv_initial_metadata closure, which is wrapped by our own // closure (recv_initial_metadata_ready_) to capture the incoming initial @@ -112,11 +112,6 @@ class ServerLoadReportingCallData : public CallData { // token. char* client_ip_and_lr_token_; size_t client_ip_and_lr_token_len_; - - static constexpr char kEncodedIpv4AddressLengthString[] = "08"; - static constexpr char kEncodedIpv6AddressLengthString[] = "32"; - static constexpr char kEmptyAddressLengthString[] = "00"; - static constexpr size_t kLengthPrefixSize = 2; }; } // namespace grpc |