diff options
author | Juanli Shen <juanlishen@google.com> | 2018-06-21 22:44:05 -0700 |
---|---|---|
committer | Juanli Shen <juanlishen@google.com> | 2018-06-22 10:00:48 -0700 |
commit | f5f1d57d7abdd68c9037576729e3e85437aaa833 (patch) | |
tree | 9437b97cf60393bf853807ab1e347c1561affa1a /src | |
parent | 7a2a8ca4ba2cd505961ac43c656cc0a7a33c7bb0 (diff) |
Add load reporting filter
Diffstat (limited to 'src')
10 files changed, 486 insertions, 309 deletions
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h new file mode 100644 index 0000000000..b62863dc2c --- /dev/null +++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h @@ -0,0 +1,108 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H +#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H + +#include <grpc/support/port_platform.h> + +#include "opencensus/stats/stats.h" + +#include "src/cpp/server/load_reporter/constants.h" + +namespace grpc { +namespace load_reporter { + +// Measures. + +::opencensus::stats::MeasureInt64 MeasureStartCount() { + static const ::opencensus::stats::MeasureInt64 start_count = + ::opencensus::stats::MeasureInt64::Register( + kMeasureStartCount, kMeasureStartCount, kMeasureStartCount); + return start_count; +} + +::opencensus::stats::MeasureInt64 MeasureEndCount() { + static const ::opencensus::stats::MeasureInt64 end_count = + ::opencensus::stats::MeasureInt64::Register( + kMeasureEndCount, kMeasureEndCount, kMeasureEndCount); + return end_count; +} + +::opencensus::stats::MeasureInt64 MeasureEndBytesSent() { + static const ::opencensus::stats::MeasureInt64 end_bytes_sent = + ::opencensus::stats::MeasureInt64::Register( + kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent); + return end_bytes_sent; +} + +::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() { + static const ::opencensus::stats::MeasureInt64 end_bytes_received = + ::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived, + kMeasureEndBytesReceived, + kMeasureEndBytesReceived); + return end_bytes_received; +} + +::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() { + static const ::opencensus::stats::MeasureInt64 end_latency_ms = + ::opencensus::stats::MeasureInt64::Register( + kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs); + return end_latency_ms; +} + +::opencensus::stats::MeasureDouble MeasureOtherCallMetric() { + static const ::opencensus::stats::MeasureDouble other_call_metric = + ::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric, + kMeasureOtherCallMetric, + kMeasureOtherCallMetric); + return other_call_metric; +} + +// Tags. + +opencensus::stats::TagKey TagKeyToken() { + static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken); + return token; +} + +opencensus::stats::TagKey TagKeyHost() { + static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost); + return token; +} +opencensus::stats::TagKey TagKeyUserId() { + static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId); + return token; +} + +opencensus::stats::TagKey TagKeyStatus() { + static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus); + return token; +} + +opencensus::stats::TagKey TagKeyMetricName() { + static const auto token = + opencensus::stats::TagKey::Register(kTagKeyMetricName); + return token; +} + +} // namespace load_reporter +} // namespace grpc + +#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_REGISTERED_OPENCENSUS_OBJECTS_H \ + */ diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index a8f70333b2..51e4d795d7 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -18,203 +18,311 @@ #include <grpc/support/port_platform.h> -#include <string.h> - +#include <grpc/grpc_security.h> #include <grpc/load_reporting.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpc/support/sync.h> +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/uri_parser.h" +#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h" #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" -#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/context.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_posix.h" +#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -namespace { -struct call_data { - intptr_t id; /**< an id unique to the call */ - bool have_trailing_md_string; - grpc_slice trailing_md_string; - bool have_initial_md_string; - grpc_slice initial_md_string; - bool have_service_method; - grpc_slice service_method; - - /* stores the recv_initial_metadata op's ready closure, which we wrap with our - * own (on_initial_md_ready) in order to capture the incoming initial metadata - * */ - grpc_closure* ops_recv_initial_metadata_ready; - - /* to get notified of the availability of the incoming initial metadata. */ - grpc_closure on_initial_md_ready; - grpc_metadata_batch* recv_initial_metadata; -}; +namespace grpc { -struct channel_data { - intptr_t id; /**< an id unique to the channel */ -}; -} // namespace - -static void on_initial_md_ready(void* user_data, grpc_error* err) { - grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - - if (err == GRPC_ERROR_NONE) { - if (calld->recv_initial_metadata->idx.named.path != nullptr) { - calld->service_method = grpc_slice_ref_internal( - GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md)); - calld->have_service_method = true; - } else { - err = grpc_error_add_child( - err, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing :path header")); - } - if (calld->recv_initial_metadata->idx.named.lb_token != nullptr) { - calld->initial_md_string = grpc_slice_ref_internal( - GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.lb_token->md)); - calld->have_initial_md_string = true; - grpc_metadata_batch_remove( - calld->recv_initial_metadata, - calld->recv_initial_metadata->idx.named.lb_token); +grpc_error* ServerLoadReportingChannelData::Init( + grpc_channel_element* /* elem */, grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); + // Find and record the peer_identity. + const grpc_auth_context* auth_context = + grpc_find_auth_context_in_args(args->channel_args); + if (auth_context != nullptr && + grpc_auth_context_peer_is_authenticated(auth_context)) { + grpc_auth_property_iterator auth_it = + grpc_auth_context_peer_identity(auth_context); + const grpc_auth_property* auth_property = + grpc_auth_property_iterator_next(&auth_it); + if (auth_property != nullptr) { + peer_identity_ = auth_property->value; + peer_identity_len_ = auth_property->value_length; } - } else { - GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err); -} - -/* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->id = (intptr_t)args->call_stack; - GRPC_CLOSURE_INIT(&calld->on_initial_md_ready, on_initial_md_ready, elem, - grpc_schedule_on_exec_ctx); - - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION, - (intptr_t)chand->id, - (intptr_t)calld->id, - NULL, - NULL, - NULL, - NULL}; - */ - return GRPC_ERROR_NONE; } -/* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* ignored) { - call_data* calld = static_cast<call_data*>(elem->call_data); +void ServerLoadReportingCallData::Destroy( + grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + // Only record an end if we've recorded its corresponding start, which is + // indicated by a non-null client_ip_and_lr_token_. Note that it's possible + // that we attempt to record the call end before we have recorded the call + // start, because the data needed for recording the start comes from the + // initial metadata, which may not be ready before the call finishes. + if (client_ip_and_lr_token_ != nullptr) { + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureEndCount(), 1}, + {::grpc::load_reporter::MeasureEndBytesSent(), + final_info->stats.transport_stream_stats.outgoing.data_bytes}, + {::grpc::load_reporter::MeasureEndBytesReceived(), + final_info->stats.transport_stream_stats.incoming.data_bytes}, + {::grpc::load_reporter::MeasureEndLatencyMs(), + gpr_time_to_millis(final_info->stats.latency)}}, + {{::grpc::load_reporter::TagKeyToken(), + {client_ip_and_lr_token_, client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {target_host_, target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}, + {::grpc::load_reporter::TagKeyStatus(), + GetStatusTagForStatus(final_info->final_status)}}); + } + grpc_slice_unref_internal(service_method_); +} - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION, - (intptr_t)chand->id, - (intptr_t)calld->id, - final_info, - calld->initial_md_string, - calld->trailing_md_string, - calld->service_method}; - */ +void ServerLoadReportingCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0); + if (op->recv_initial_metadata() != nullptr) { + // Save some fields to use when initial metadata is ready. + peer_string_ = op->get_peer_string(); + recv_initial_metadata_ = op->recv_initial_metadata(); + original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready(); + // Substitute the original closure for the wrapper closure. + op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_); + } else if (op->send_trailing_metadata() != nullptr) { + GRPC_LOG_IF_ERROR( + "server_load_reporting_filter", + grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(), + SendTrailingMetadataFilter, elem, + "send_trailing_metadata filtering error")); + } + grpc_call_next_op(elem, op->op()); +} - if (calld->have_initial_md_string) { - grpc_slice_unref_internal(calld->initial_md_string); +void ServerLoadReportingCallData::GetCensusSafeClientIpString( + char** client_ip_string, size_t* size) { + // Find the client URI string. + const char* client_uri_str = + reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_)); + if (client_uri_str == nullptr) { + gpr_log(GPR_ERROR, + "Unable to extract client URI string (peer string) from gRPC " + "metadata."); + *client_ip_string = nullptr; + *size = 0; + return; } - if (calld->have_trailing_md_string) { - grpc_slice_unref_internal(calld->trailing_md_string); + // Parse the client URI string into grpc_uri. + grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true); + if (client_uri == nullptr) { + gpr_log(GPR_ERROR, + "Unable to parse the client URI string (peer string) to a client " + "URI."); + *client_ip_string = nullptr; + *size = 0; + return; } - if (calld->have_service_method) { - grpc_slice_unref_internal(calld->service_method); + // Parse the client URI into grpc_resolved_address. + grpc_resolved_address resolved_address; + bool success = grpc_parse_uri(client_uri, &resolved_address); + grpc_uri_destroy(client_uri); + if (!success) { + gpr_log(GPR_ERROR, + "Unable to parse client URI into a grpc_resolved_address."); + *client_ip_string = nullptr; + *size = 0; + return; + } + // Convert the socket address in the grpc_resolved_address into a hex string + // according to the address family. + grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr); + if (addr->sa_family == GRPC_AF_INET) { + grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr); + gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr)); + *size = 8; + } else if (addr->sa_family == GRPC_AF_INET6) { + grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr); + *client_ip_string = static_cast<char*>(gpr_malloc(32)); + for (size_t i = 0; i < 16; ++i) { + sprintf(*client_ip_string + i, "%02x", + addr6->sin6_addr.__in6_u.__u6_addr8[i]); + } + *size = 32; + } else { + GPR_UNREACHABLE_CODE(); } } -/* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - chand->id = (intptr_t)args->channel_stack; +void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token, + size_t lr_token_len) { + char* client_ip; + size_t client_ip_len; + GetCensusSafeClientIpString(&client_ip, &client_ip_len); + client_ip_and_lr_token_len_ = + kLengthPrefixSize + client_ip_len + lr_token_len; + client_ip_and_lr_token_ = static_cast<char*>( + gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char))); + char* cur_pos = client_ip_and_lr_token_; + // Store the IP length prefix. + if (client_ip_len == 0) { + strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize); + } else if (client_ip_len == 8) { + strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize); + } else if (client_ip_len == 32) { + strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize); + } else { + GPR_UNREACHABLE_CODE(); + } + cur_pos += kLengthPrefixSize; + // Store the IP. + if (client_ip_len != 0) { + strncpy(cur_pos, client_ip, client_ip_len); + } + gpr_free(client_ip); + cur_pos += client_ip_len; + // Store the LR token. + if (lr_token_len != 0) { + strncpy(cur_pos, lr_token, lr_token_len); + } + GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ == + client_ip_and_lr_token_len_); +} - /* TODO(dgq): do something with the data - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION, - (intptr_t)chand, - 0, - NULL, - NULL, - NULL, - NULL}; - */ +grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter( + void* user_data, grpc_mdelem md) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) { + calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md)); + } else if (calld->target_host_ == nullptr && + grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) { + grpc_slice target_host_slice = GRPC_MDVALUE(md); + calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice); + calld->target_host_ = + reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_)); + for (size_t i = 0; i < calld->target_host_len_; ++i) { + calld->target_host_[i] = static_cast<char>( + tolower(GRPC_SLICE_START_PTR(target_host_slice)[i])); + } + } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_TOKEN)) { + if (calld->client_ip_and_lr_token_ == nullptr) { + calld->StoreClientIpAndLrToken( + reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(md))); + } + return GRPC_FILTERED_REMOVE(); + } + return GRPC_FILTERED_MDELEM(md); +} - return GRPC_ERROR_NONE; +void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg, + grpc_error* err) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + if (err == GRPC_ERROR_NONE) { + GRPC_LOG_IF_ERROR( + "server_load_reporting_filter", + grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(), + RecvInitialMetadataFilter, elem, + "recv_initial_metadata filtering error")); + // If the LB token was not found in the recv_initial_metadata, only the + // client IP part will be recorded (with an empty LB token). + if (calld->client_ip_and_lr_token_ == nullptr) { + calld->StoreClientIpAndLrToken(nullptr, 0); + } + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureStartCount(), 1}}, + {{::grpc::load_reporter::TagKeyToken(), + {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {calld->target_host_, calld->target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}}); + } + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_, + GRPC_ERROR_REF(err)); } -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) { - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = { - GRPC_LR_POINT_CHANNEL_DESTRUCTION, - (intptr_t)chand->id, - 0, - NULL, - NULL, - NULL, - NULL}; - */ +grpc_error* ServerLoadReportingCallData::Init( + grpc_call_element* elem, const grpc_call_element_args* args) { + service_method_ = grpc_empty_slice(); + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, + elem, grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; } -static grpc_filtered_mdelem lr_trailing_md_filter(void* user_data, - grpc_mdelem md) { - grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); - call_data* calld = static_cast<call_data*>(elem->call_data); +grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter( + void* user_data, grpc_mdelem md) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning? if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) { - calld->trailing_md_string = GRPC_MDVALUE(md); + const grpc_slice value = GRPC_MDVALUE(md); + const size_t cost_entry_size = GRPC_SLICE_LENGTH(value); + if (cost_entry_size < sizeof(double)) { + gpr_log(GPR_ERROR, + "Cost metadata value too small (%zu bytes) to hold valid data. " + "Ignoring.", + cost_entry_size); + return GRPC_FILTERED_REMOVE(); + } + const double* cost_entry_ptr = + reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value)); + double cost_value = *cost_entry_ptr++; + const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr); + const size_t cost_name_len = cost_entry_size - sizeof(double); + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}}, + {{::grpc::load_reporter::TagKeyToken(), + {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {calld->target_host_, calld->target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}, + {::grpc::load_reporter::TagKeyMetricName(), + {cost_name, cost_name_len}}}); return GRPC_FILTERED_REMOVE(); } return GRPC_FILTERED_MDELEM(md); } -static void lr_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - GPR_TIMER_SCOPE("lr_start_transport_stream_op_batch", 0); - call_data* calld = static_cast<call_data*>(elem->call_data); - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->ops_recv_initial_metadata_ready = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->on_initial_md_ready; - } else if (op->send_trailing_metadata) { - GRPC_LOG_IF_ERROR( - "grpc_metadata_batch_filter", - grpc_metadata_batch_filter( - op->payload->send_trailing_metadata.send_trailing_metadata, - lr_trailing_md_filter, elem, - "LR trailing metadata filtering error")); +const char* ServerLoadReportingCallData::GetStatusTagForStatus( + grpc_status_code status) { + switch (status) { + case GRPC_STATUS_OK: + return ::grpc::load_reporter::kCallStatusOk; + case GRPC_STATUS_UNKNOWN: + case GRPC_STATUS_DEADLINE_EXCEEDED: + case GRPC_STATUS_UNIMPLEMENTED: + case GRPC_STATUS_INTERNAL: + case GRPC_STATUS_UNAVAILABLE: + case GRPC_STATUS_DATA_LOSS: + return ::grpc::load_reporter::kCallStatusServerError; + default: + return ::grpc::load_reporter::kCallStatusClientError; } - grpc_call_next_op(elem, op); } -const grpc_channel_filter grpc_server_load_reporting_filter = { - lr_start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_channel_next_get_info, - "load_reporting"}; +} // namespace grpc diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h index b459a8ec5f..029b19ac89 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h @@ -21,10 +21,105 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/cpp/common/channel_filter.h" -extern const grpc_channel_filter grpc_server_load_reporting_filter; +namespace grpc { + +class ServerLoadReportingChannelData : public ChannelData { + public: + grpc_error* Init(grpc_channel_element* elem, + grpc_channel_element_args* args) override; + + // Getters. + const char* peer_identity() { return peer_identity_; } + size_t peer_identity_len() { return peer_identity_len_; } + + private: + // The peer's authenticated identity. + char* peer_identity_ = nullptr; + size_t peer_identity_len_ = 0; +}; + +class ServerLoadReportingCallData : public CallData { + public: + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + private: + // From the peer_string_ in calld, extracts the client IP string (owned by + // caller), e.g., "01020a0b". Upon failure, set the output pointer to null and + // size to zero. + void GetCensusSafeClientIpString(char** client_ip_string, size_t* size); + + // Concatenates the client IP address and the load reporting token, then + // stores the result into the call data. + void StoreClientIpAndLrToken(const char* lr_token, size_t lr_token_len); + + // This matches the classification of the status codes in + // googleapis/google/rpc/code.proto. + static const char* GetStatusTagForStatus(grpc_status_code status); + + // Records the call start. + static void RecvInitialMetadataReady(void* arg, grpc_error* err); + + // From the initial metadata, extracts the service_method_, target_host_, and + // client_ip_and_lr_token_. + static grpc_filtered_mdelem RecvInitialMetadataFilter(void* user_data, + grpc_mdelem md); + + // Records the other call metrics. + static grpc_filtered_mdelem SendTrailingMetadataFilter(void* user_data, + grpc_mdelem md); + + // The peer string (a member of the recv_initial_metadata op). Note that + // gpr_atm itself is a pointer type here, making "peer_string_" effectively a + // double pointer. + const gpr_atm* peer_string_; + + // The received initial metadata (a member of the recv_initial_metadata op). + // When it is ready, we will extract some data from it via + // recv_initial_metadata_ready_ closure, before the original + // recv_initial_metadata_ready closure, + MetadataBatch* recv_initial_metadata_; + + // The original recv_initial_metadata closure, which is wrapped by our own + // closure (recv_initial_metadata_ready_) to capture the incoming initial + // metadata. + grpc_closure* original_recv_initial_metadata_ready_; + + // The closure that wraps the original closure. Scheduled when + // recv_initial_metadata_ is ready. + grpc_closure recv_initial_metadata_ready_; + + // Corresponds to the :path header. + grpc_slice service_method_; + + // The backend host that the client thinks it's talking to. This may be + // different from the actual backend in the case of, for example, + // load-balanced targets. We store a copy of the metadata slice in order to + // lowercase it. */ + char* target_host_; + size_t target_host_len_; + + // The client IP address (including a length prefix) and the load reporting + // token. + char* client_ip_and_lr_token_; + size_t client_ip_and_lr_token_len_; + + static constexpr char kEncodedIpv4AddressLengthString[] = "08"; + static constexpr char kEncodedIpv6AddressLengthString[] = "32"; + static constexpr char kEmptyAddressLengthString[] = "00"; + static constexpr size_t kLengthPrefixSize = 2; +}; + +} // namespace grpc #endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \ */ diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc deleted file mode 100644 index 667c0c56ef..0000000000 --- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <limits.h> -#include <string.h> - -#include <grpc/load_reporting.h> -#include <grpc/support/alloc.h> -#include <grpc/support/sync.h> - -#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" -#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" -#include "src/core/lib/channel/channel_stack_builder.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/channel_init.h" - -static bool is_load_reporting_enabled(const grpc_channel_args* a) { - return grpc_channel_arg_get_bool( - grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false); -} - -static bool maybe_add_server_load_reporting_filter( - grpc_channel_stack_builder* builder, void* arg) { - const grpc_channel_args* args = - grpc_channel_stack_builder_get_channel_arguments(builder); - const grpc_channel_filter* filter = - static_cast<const grpc_channel_filter*>(arg); - grpc_channel_stack_builder_iterator* it = - grpc_channel_stack_builder_iterator_find(builder, filter->name); - const bool already_has_load_reporting_filter = - !grpc_channel_stack_builder_iterator_is_end(it); - grpc_channel_stack_builder_iterator_destroy(it); - if (is_load_reporting_enabled(args) && !already_has_load_reporting_filter) { - return grpc_channel_stack_builder_prepend_filter(builder, filter, nullptr, - nullptr); - } - return true; -} - -grpc_arg grpc_load_reporting_enable_arg() { - return grpc_channel_arg_integer_create((char*)GRPC_ARG_ENABLE_LOAD_REPORTING, - 1); -} - -/* Plugin registration */ - -void grpc_server_load_reporting_plugin_init(void) { - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_server_load_reporting_filter, - (void*)&grpc_server_load_reporting_filter); -} - -void grpc_server_load_reporting_plugin_shutdown() {} diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h deleted file mode 100644 index c20aaa744f..0000000000 --- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H -#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H - -#include <grpc/support/port_platform.h> - -#include <grpc/impl/codegen/grpc_types.h> - -#include "src/core/lib/channel/channel_stack.h" - -/** Identifiers for the invocation point of the users LR callback */ -typedef enum grpc_load_reporting_source { - GRPC_LR_POINT_UNKNOWN = 0, - GRPC_LR_POINT_CHANNEL_CREATION, - GRPC_LR_POINT_CHANNEL_DESTRUCTION, - GRPC_LR_POINT_CALL_CREATION, - GRPC_LR_POINT_CALL_DESTRUCTION -} grpc_load_reporting_source; - -/** Call information to be passed to the provided LR callback. */ -typedef struct grpc_load_reporting_call_data { - const grpc_load_reporting_source source; /**< point of last data update. */ - - /** Unique identifier for the channel associated with the data */ - intptr_t channel_id; - - /** Unique identifier for the call associated with the data. If the call - * hasn't been created yet, it'll have a value of zero. */ - intptr_t call_id; - - /** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is, - * once the call has completed */ - const grpc_call_final_info* final_info; - - const char* initial_md_string; /**< value string for LR's initial md key */ - const char* trailing_md_string; /**< value string for LR's trailing md key */ - const char* method_name; /**< Corresponds to :path header */ -} grpc_load_reporting_call_data; - -/** Return a \a grpc_arg enabling load reporting */ -grpc_arg grpc_load_reporting_enable_arg(); - -#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \ - */ diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc index 49b9c7d4fe..c0c17b0a4b 100644 --- a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc @@ -30,8 +30,6 @@ void grpc_client_channel_init(void); void grpc_client_channel_shutdown(void); void grpc_tsi_alts_init(void); void grpc_tsi_alts_shutdown(void); -void grpc_server_load_reporting_plugin_init(void); -void grpc_server_load_reporting_plugin_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_http_filters_init, @@ -44,6 +42,4 @@ void grpc_register_built_in_plugins(void) { grpc_client_channel_shutdown); grpc_register_plugin(grpc_tsi_alts_init, grpc_tsi_alts_shutdown); - grpc_register_plugin(grpc_server_load_reporting_plugin_init, - grpc_server_load_reporting_plugin_shutdown); } diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index e371310fa1..fb523a173d 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -46,8 +46,6 @@ void grpc_resolver_dns_native_init(void); void grpc_resolver_dns_native_shutdown(void); void grpc_resolver_sockaddr_init(void); void grpc_resolver_sockaddr_shutdown(void); -void grpc_server_load_reporting_plugin_init(void); -void grpc_server_load_reporting_plugin_shutdown(void); void grpc_max_age_filter_init(void); void grpc_max_age_filter_shutdown(void); void grpc_message_size_filter_init(void); @@ -84,8 +82,6 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_dns_native_shutdown); grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_resolver_sockaddr_shutdown); - grpc_register_plugin(grpc_server_load_reporting_plugin_init, - grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_max_age_filter_init, grpc_max_age_filter_shutdown); grpc_register_plugin(grpc_message_size_filter_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index 283db5b4f4..80214aebe2 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -38,8 +38,6 @@ void grpc_resolver_sockaddr_init(void); void grpc_resolver_sockaddr_shutdown(void); void grpc_resolver_fake_init(void); void grpc_resolver_fake_shutdown(void); -void grpc_server_load_reporting_plugin_init(void); -void grpc_server_load_reporting_plugin_shutdown(void); void grpc_lb_policy_grpclb_init(void); void grpc_lb_policy_grpclb_shutdown(void); void grpc_lb_policy_pick_first_init(void); @@ -74,8 +72,6 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_sockaddr_shutdown); grpc_register_plugin(grpc_resolver_fake_init, grpc_resolver_fake_shutdown); - grpc_register_plugin(grpc_server_load_reporting_plugin_init, - grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index bd0ec969b4..5e569c97e6 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -207,6 +207,18 @@ class TransportStreamOpBatch { op_->payload->context[GRPC_CONTEXT_TRACING].value); } + const gpr_atm* get_peer_string() const { + if (op_->send_initial_metadata && + op_->payload->send_initial_metadata.peer_string != nullptr) { + return op_->payload->send_initial_metadata.peer_string; + } else if (op_->recv_initial_metadata && + op_->payload->recv_initial_metadata.peer_string != nullptr) { + return op_->payload->recv_initial_metadata.peer_string; + } else { + return nullptr; + } + } + private: grpc_transport_stream_op_batch* op_; // Not owned. MetadataBatch send_initial_metadata_; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index b20b8155a0..f48e0f5cb6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -352,8 +352,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', - 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc', - 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc', 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', |