diff options
Diffstat (limited to 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc')
-rw-r--r-- | src/core/ext/filters/load_reporting/server_load_reporting_filter.cc | 430 |
1 files changed, 269 insertions, 161 deletions
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index a8f70333b2..51e4d795d7 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -18,203 +18,311 @@ #include <grpc/support/port_platform.h> -#include <string.h> - +#include <grpc/grpc_security.h> #include <grpc/load_reporting.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpc/support/sync.h> +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/uri_parser.h" +#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h" #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" -#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/context.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_posix.h" +#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -namespace { -struct call_data { - intptr_t id; /**< an id unique to the call */ - bool have_trailing_md_string; - grpc_slice trailing_md_string; - bool have_initial_md_string; - grpc_slice initial_md_string; - bool have_service_method; - grpc_slice service_method; - - /* stores the recv_initial_metadata op's ready closure, which we wrap with our - * own (on_initial_md_ready) in order to capture the incoming initial metadata - * */ - grpc_closure* ops_recv_initial_metadata_ready; - - /* to get notified of the availability of the incoming initial metadata. */ - grpc_closure on_initial_md_ready; - grpc_metadata_batch* recv_initial_metadata; -}; +namespace grpc { -struct channel_data { - intptr_t id; /**< an id unique to the channel */ -}; -} // namespace - -static void on_initial_md_ready(void* user_data, grpc_error* err) { - grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - - if (err == GRPC_ERROR_NONE) { - if (calld->recv_initial_metadata->idx.named.path != nullptr) { - calld->service_method = grpc_slice_ref_internal( - GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md)); - calld->have_service_method = true; - } else { - err = grpc_error_add_child( - err, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing :path header")); - } - if (calld->recv_initial_metadata->idx.named.lb_token != nullptr) { - calld->initial_md_string = grpc_slice_ref_internal( - GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.lb_token->md)); - calld->have_initial_md_string = true; - grpc_metadata_batch_remove( - calld->recv_initial_metadata, - calld->recv_initial_metadata->idx.named.lb_token); +grpc_error* ServerLoadReportingChannelData::Init( + grpc_channel_element* /* elem */, grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); + // Find and record the peer_identity. + const grpc_auth_context* auth_context = + grpc_find_auth_context_in_args(args->channel_args); + if (auth_context != nullptr && + grpc_auth_context_peer_is_authenticated(auth_context)) { + grpc_auth_property_iterator auth_it = + grpc_auth_context_peer_identity(auth_context); + const grpc_auth_property* auth_property = + grpc_auth_property_iterator_next(&auth_it); + if (auth_property != nullptr) { + peer_identity_ = auth_property->value; + peer_identity_len_ = auth_property->value_length; } - } else { - GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err); -} - -/* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->id = (intptr_t)args->call_stack; - GRPC_CLOSURE_INIT(&calld->on_initial_md_ready, on_initial_md_ready, elem, - grpc_schedule_on_exec_ctx); - - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION, - (intptr_t)chand->id, - (intptr_t)calld->id, - NULL, - NULL, - NULL, - NULL}; - */ - return GRPC_ERROR_NONE; } -/* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* ignored) { - call_data* calld = static_cast<call_data*>(elem->call_data); +void ServerLoadReportingCallData::Destroy( + grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + // Only record an end if we've recorded its corresponding start, which is + // indicated by a non-null client_ip_and_lr_token_. Note that it's possible + // that we attempt to record the call end before we have recorded the call + // start, because the data needed for recording the start comes from the + // initial metadata, which may not be ready before the call finishes. + if (client_ip_and_lr_token_ != nullptr) { + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureEndCount(), 1}, + {::grpc::load_reporter::MeasureEndBytesSent(), + final_info->stats.transport_stream_stats.outgoing.data_bytes}, + {::grpc::load_reporter::MeasureEndBytesReceived(), + final_info->stats.transport_stream_stats.incoming.data_bytes}, + {::grpc::load_reporter::MeasureEndLatencyMs(), + gpr_time_to_millis(final_info->stats.latency)}}, + {{::grpc::load_reporter::TagKeyToken(), + {client_ip_and_lr_token_, client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {target_host_, target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}, + {::grpc::load_reporter::TagKeyStatus(), + GetStatusTagForStatus(final_info->final_status)}}); + } + grpc_slice_unref_internal(service_method_); +} - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION, - (intptr_t)chand->id, - (intptr_t)calld->id, - final_info, - calld->initial_md_string, - calld->trailing_md_string, - calld->service_method}; - */ +void ServerLoadReportingCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0); + if (op->recv_initial_metadata() != nullptr) { + // Save some fields to use when initial metadata is ready. + peer_string_ = op->get_peer_string(); + recv_initial_metadata_ = op->recv_initial_metadata(); + original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready(); + // Substitute the original closure for the wrapper closure. + op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_); + } else if (op->send_trailing_metadata() != nullptr) { + GRPC_LOG_IF_ERROR( + "server_load_reporting_filter", + grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(), + SendTrailingMetadataFilter, elem, + "send_trailing_metadata filtering error")); + } + grpc_call_next_op(elem, op->op()); +} - if (calld->have_initial_md_string) { - grpc_slice_unref_internal(calld->initial_md_string); +void ServerLoadReportingCallData::GetCensusSafeClientIpString( + char** client_ip_string, size_t* size) { + // Find the client URI string. + const char* client_uri_str = + reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_)); + if (client_uri_str == nullptr) { + gpr_log(GPR_ERROR, + "Unable to extract client URI string (peer string) from gRPC " + "metadata."); + *client_ip_string = nullptr; + *size = 0; + return; } - if (calld->have_trailing_md_string) { - grpc_slice_unref_internal(calld->trailing_md_string); + // Parse the client URI string into grpc_uri. + grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true); + if (client_uri == nullptr) { + gpr_log(GPR_ERROR, + "Unable to parse the client URI string (peer string) to a client " + "URI."); + *client_ip_string = nullptr; + *size = 0; + return; } - if (calld->have_service_method) { - grpc_slice_unref_internal(calld->service_method); + // Parse the client URI into grpc_resolved_address. + grpc_resolved_address resolved_address; + bool success = grpc_parse_uri(client_uri, &resolved_address); + grpc_uri_destroy(client_uri); + if (!success) { + gpr_log(GPR_ERROR, + "Unable to parse client URI into a grpc_resolved_address."); + *client_ip_string = nullptr; + *size = 0; + return; + } + // Convert the socket address in the grpc_resolved_address into a hex string + // according to the address family. + grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr); + if (addr->sa_family == GRPC_AF_INET) { + grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr); + gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr)); + *size = 8; + } else if (addr->sa_family == GRPC_AF_INET6) { + grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr); + *client_ip_string = static_cast<char*>(gpr_malloc(32)); + for (size_t i = 0; i < 16; ++i) { + sprintf(*client_ip_string + i, "%02x", + addr6->sin6_addr.__in6_u.__u6_addr8[i]); + } + *size = 32; + } else { + GPR_UNREACHABLE_CODE(); } } -/* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - chand->id = (intptr_t)args->channel_stack; +void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token, + size_t lr_token_len) { + char* client_ip; + size_t client_ip_len; + GetCensusSafeClientIpString(&client_ip, &client_ip_len); + client_ip_and_lr_token_len_ = + kLengthPrefixSize + client_ip_len + lr_token_len; + client_ip_and_lr_token_ = static_cast<char*>( + gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char))); + char* cur_pos = client_ip_and_lr_token_; + // Store the IP length prefix. + if (client_ip_len == 0) { + strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize); + } else if (client_ip_len == 8) { + strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize); + } else if (client_ip_len == 32) { + strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize); + } else { + GPR_UNREACHABLE_CODE(); + } + cur_pos += kLengthPrefixSize; + // Store the IP. + if (client_ip_len != 0) { + strncpy(cur_pos, client_ip, client_ip_len); + } + gpr_free(client_ip); + cur_pos += client_ip_len; + // Store the LR token. + if (lr_token_len != 0) { + strncpy(cur_pos, lr_token, lr_token_len); + } + GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ == + client_ip_and_lr_token_len_); +} - /* TODO(dgq): do something with the data - grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION, - (intptr_t)chand, - 0, - NULL, - NULL, - NULL, - NULL}; - */ +grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter( + void* user_data, grpc_mdelem md) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) { + calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md)); + } else if (calld->target_host_ == nullptr && + grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) { + grpc_slice target_host_slice = GRPC_MDVALUE(md); + calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice); + calld->target_host_ = + reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_)); + for (size_t i = 0; i < calld->target_host_len_; ++i) { + calld->target_host_[i] = static_cast<char>( + tolower(GRPC_SLICE_START_PTR(target_host_slice)[i])); + } + } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_TOKEN)) { + if (calld->client_ip_and_lr_token_ == nullptr) { + calld->StoreClientIpAndLrToken( + reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(md))); + } + return GRPC_FILTERED_REMOVE(); + } + return GRPC_FILTERED_MDELEM(md); +} - return GRPC_ERROR_NONE; +void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg, + grpc_error* err) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + if (err == GRPC_ERROR_NONE) { + GRPC_LOG_IF_ERROR( + "server_load_reporting_filter", + grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(), + RecvInitialMetadataFilter, elem, + "recv_initial_metadata filtering error")); + // If the LB token was not found in the recv_initial_metadata, only the + // client IP part will be recorded (with an empty LB token). + if (calld->client_ip_and_lr_token_ == nullptr) { + calld->StoreClientIpAndLrToken(nullptr, 0); + } + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureStartCount(), 1}}, + {{::grpc::load_reporter::TagKeyToken(), + {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {calld->target_host_, calld->target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}}); + } + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_, + GRPC_ERROR_REF(err)); } -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) { - /* TODO(dgq): do something with the data - channel_data *chand = elem->channel_data; - grpc_load_reporting_call_data lr_call_data = { - GRPC_LR_POINT_CHANNEL_DESTRUCTION, - (intptr_t)chand->id, - 0, - NULL, - NULL, - NULL, - NULL}; - */ +grpc_error* ServerLoadReportingCallData::Init( + grpc_call_element* elem, const grpc_call_element_args* args) { + service_method_ = grpc_empty_slice(); + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, + elem, grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; } -static grpc_filtered_mdelem lr_trailing_md_filter(void* user_data, - grpc_mdelem md) { - grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); - call_data* calld = static_cast<call_data*>(elem->call_data); +grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter( + void* user_data, grpc_mdelem md) { + grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); + ServerLoadReportingCallData* calld = + reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data); + ServerLoadReportingChannelData* chand = + reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data); + // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning? if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) { - calld->trailing_md_string = GRPC_MDVALUE(md); + const grpc_slice value = GRPC_MDVALUE(md); + const size_t cost_entry_size = GRPC_SLICE_LENGTH(value); + if (cost_entry_size < sizeof(double)) { + gpr_log(GPR_ERROR, + "Cost metadata value too small (%zu bytes) to hold valid data. " + "Ignoring.", + cost_entry_size); + return GRPC_FILTERED_REMOVE(); + } + const double* cost_entry_ptr = + reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value)); + double cost_value = *cost_entry_ptr++; + const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr); + const size_t cost_name_len = cost_entry_size - sizeof(double); + opencensus::stats::Record( + {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}}, + {{::grpc::load_reporter::TagKeyToken(), + {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}}, + {::grpc::load_reporter::TagKeyHost(), + {calld->target_host_, calld->target_host_len_}}, + {::grpc::load_reporter::TagKeyUserId(), + {chand->peer_identity(), chand->peer_identity_len()}}, + {::grpc::load_reporter::TagKeyMetricName(), + {cost_name, cost_name_len}}}); return GRPC_FILTERED_REMOVE(); } return GRPC_FILTERED_MDELEM(md); } -static void lr_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - GPR_TIMER_SCOPE("lr_start_transport_stream_op_batch", 0); - call_data* calld = static_cast<call_data*>(elem->call_data); - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->ops_recv_initial_metadata_ready = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->on_initial_md_ready; - } else if (op->send_trailing_metadata) { - GRPC_LOG_IF_ERROR( - "grpc_metadata_batch_filter", - grpc_metadata_batch_filter( - op->payload->send_trailing_metadata.send_trailing_metadata, - lr_trailing_md_filter, elem, - "LR trailing metadata filtering error")); +const char* ServerLoadReportingCallData::GetStatusTagForStatus( + grpc_status_code status) { + switch (status) { + case GRPC_STATUS_OK: + return ::grpc::load_reporter::kCallStatusOk; + case GRPC_STATUS_UNKNOWN: + case GRPC_STATUS_DEADLINE_EXCEEDED: + case GRPC_STATUS_UNIMPLEMENTED: + case GRPC_STATUS_INTERNAL: + case GRPC_STATUS_UNAVAILABLE: + case GRPC_STATUS_DATA_LOSS: + return ::grpc::load_reporter::kCallStatusServerError; + default: + return ::grpc::load_reporter::kCallStatusClientError; } - grpc_call_next_op(elem, op); } -const grpc_channel_filter grpc_server_load_reporting_filter = { - lr_start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_channel_next_get_info, - "load_reporting"}; +} // namespace grpc |