diff options
Diffstat (limited to 'src/cpp/server/load_reporter/load_data_store.cc')
-rw-r--r-- | src/cpp/server/load_reporter/load_data_store.cc | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/src/cpp/server/load_reporter/load_data_store.cc b/src/cpp/server/load_reporter/load_data_store.cc index 70f12c1102..594473f5e7 100644 --- a/src/cpp/server/load_reporter/load_data_store.cc +++ b/src/cpp/server/load_reporter/load_data_store.cc @@ -16,11 +16,15 @@ * */ +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> #include <cstdlib> #include <set> #include <unordered_map> #include <vector> +#include "src/core/lib/iomgr/socket_utils.h" #include "src/cpp/server/load_reporter/load_data_store.h" namespace grpc { @@ -73,6 +77,67 @@ const typename C::value_type* RandomElement(const C& container) { } // namespace +LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token, + grpc::string user_id) + : user_id_(std::move(user_id)) { + GPR_ASSERT(client_ip_and_token.size() >= 2); + int ip_hex_size; + GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", + &ip_hex_size) == 1); + GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || + ip_hex_size == kIpv6AddressLength); + size_t cur_pos = 2; + client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); + cur_pos += ip_hex_size; + if (client_ip_and_token.size() - cur_pos < kLbIdLength) { + lb_id_ = kInvalidLbId; + lb_tag_ = ""; + } else { + lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); + lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); + } +} + +grpc::string LoadRecordKey::GetClientIpBytes() const { + if (client_ip_hex_.empty()) { + return ""; + } else if (client_ip_hex_.size() == kIpv4AddressLength) { + uint32_t ip_bytes; + if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { + gpr_log(GPR_ERROR, + "Can't parse client IP (%s) from a hex string to an integer.", + client_ip_hex_.c_str()); + return ""; + } + ip_bytes = grpc_htonl(ip_bytes); + return grpc::string(reinterpret_cast<const char*>(&ip_bytes), + sizeof(ip_bytes)); + } else if (client_ip_hex_.size() == kIpv6AddressLength) { + uint32_t ip_bytes[4]; + for (size_t i = 0; i < 4; ++i) { + if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", + ip_bytes + i) != 1) { + gpr_log( + GPR_ERROR, + "Can't parse client IP part (%s) from a hex string to an integer.", + client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); + return ""; + } + ip_bytes[i] = grpc_htonl(ip_bytes[i]); + } + return grpc::string(reinterpret_cast<const char*>(ip_bytes), + sizeof(ip_bytes)); + } else { + GPR_UNREACHABLE_CODE(return ""); + } +} + +LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls, + double total_metric_value) { + call_metrics_.emplace(std::move(metric_name), + CallMetricValue(num_calls, total_metric_value)); +} + void PerBalancerStore::MergeRow(const LoadRecordKey& key, const LoadRecordValue& value) { // During suspension, the load data received will be dropped. |