aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/load_reporter/load_data_store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/load_reporter/load_data_store.cc')
-rw-r--r--src/cpp/server/load_reporter/load_data_store.cc65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/cpp/server/load_reporter/load_data_store.cc b/src/cpp/server/load_reporter/load_data_store.cc
index 70f12c1102..594473f5e7 100644
--- a/src/cpp/server/load_reporter/load_data_store.cc
+++ b/src/cpp/server/load_reporter/load_data_store.cc
@@ -16,11 +16,15 @@
*
*/
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
@@ -73,6 +77,67 @@ const typename C::value_type* RandomElement(const C& container) {
} // namespace
+LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
+ grpc::string user_id)
+ : user_id_(std::move(user_id)) {
+ GPR_ASSERT(client_ip_and_token.size() >= 2);
+ int ip_hex_size;
+ GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
+ &ip_hex_size) == 1);
+ GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
+ ip_hex_size == kIpv6AddressLength);
+ size_t cur_pos = 2;
+ client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
+ cur_pos += ip_hex_size;
+ if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
+ lb_id_ = kInvalidLbId;
+ lb_tag_ = "";
+ } else {
+ lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
+ lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
+ }
+}
+
+grpc::string LoadRecordKey::GetClientIpBytes() const {
+ if (client_ip_hex_.empty()) {
+ return "";
+ } else if (client_ip_hex_.size() == kIpv4AddressLength) {
+ uint32_t ip_bytes;
+ if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
+ gpr_log(GPR_ERROR,
+ "Can't parse client IP (%s) from a hex string to an integer.",
+ client_ip_hex_.c_str());
+ return "";
+ }
+ ip_bytes = grpc_htonl(ip_bytes);
+ return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
+ sizeof(ip_bytes));
+ } else if (client_ip_hex_.size() == kIpv6AddressLength) {
+ uint32_t ip_bytes[4];
+ for (size_t i = 0; i < 4; ++i) {
+ if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
+ ip_bytes + i) != 1) {
+ gpr_log(
+ GPR_ERROR,
+ "Can't parse client IP part (%s) from a hex string to an integer.",
+ client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
+ return "";
+ }
+ ip_bytes[i] = grpc_htonl(ip_bytes[i]);
+ }
+ return grpc::string(reinterpret_cast<const char*>(ip_bytes),
+ sizeof(ip_bytes));
+ } else {
+ GPR_UNREACHABLE_CODE(return "");
+ }
+}
+
+LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
+ double total_metric_value) {
+ call_metrics_.emplace(std::move(metric_name),
+ CallMetricValue(num_calls, total_metric_value));
+}
+
void PerBalancerStore::MergeRow(const LoadRecordKey& key,
const LoadRecordValue& value) {
// During suspension, the load data received will be dropped.