aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-05-01 10:30:54 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-05-01 10:46:44 -0700
commita0aab7ebcc4e3af28f9dde745bf1800094b085a7 (patch)
tree2acf56a2800a77f2eba81eb092b1ecad841e7a2b /src/cpp/server
parentac0188e25b46a191e278610e264edb1df1287c94 (diff)
Add load data store
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/load_reporter/load_data_store.cc273
-rw-r--r--src/cpp/server/load_reporter/load_data_store.h339
2 files changed, 612 insertions, 0 deletions
diff --git a/src/cpp/server/load_reporter/load_data_store.cc b/src/cpp/server/load_reporter/load_data_store.cc
new file mode 100644
index 0000000000..70f12c1102
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_data_store.cc
@@ -0,0 +1,273 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <cstdlib>
+#include <set>
+#include <unordered_map>
+#include <vector>
+
+#include "src/cpp/server/load_reporter/load_data_store.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Some helper functions.
+namespace {
+
+// Given a map from type K to a set of value type V, finds the set associated
+// with the given key and erases the value from the set. If the set becomes
+// empty, also erases the key-set pair. Returns true if the value is erased
+// successfully.
+template <typename K, typename V>
+bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
+ const K& key, const V& value) {
+ auto it = map.find(key);
+ if (it != map.end()) {
+ size_t erased = it->second.erase(value);
+ if (it->second.size() == 0) {
+ map.erase(it);
+ }
+ return erased;
+ }
+ return false;
+};
+
+// Given a map from type K to a set of value type V, removes the given key and
+// the associated set, and returns the set. Returns an empty set if the key is
+// not found.
+template <typename K, typename V>
+std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
+ const K& key) {
+ auto it = map.find(key);
+ if (it != map.end()) {
+ auto set = std::move(it->second);
+ map.erase(it);
+ return set;
+ }
+ return {};
+};
+
+// From a non-empty container, returns a pointer to a random element.
+template <typename C>
+const typename C::value_type* RandomElement(const C& container) {
+ GPR_ASSERT(!container.empty());
+ auto it = container.begin();
+ std::advance(it, std::rand() % container.size());
+ return &(*it);
+}
+
+} // namespace
+
+void PerBalancerStore::MergeRow(const LoadRecordKey& key,
+ const LoadRecordValue& value) {
+ // During suspension, the load data received will be dropped.
+ if (!suspended_) {
+ load_record_map_[key].MergeFrom(value);
+ gpr_log(GPR_DEBUG,
+ "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
+ this, key.ToString().c_str(), value.ToString().c_str());
+ } else {
+ gpr_log(GPR_DEBUG,
+ "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
+ this, key.ToString().c_str(), value.ToString().c_str());
+ }
+ // We always keep track of num_calls_in_progress_, so that when this
+ // store is resumed, we still have a correct value of
+ // num_calls_in_progress_.
+ GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
+ value.GetNumCallsInProgressDelta() >=
+ 0);
+ num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
+}
+
+void PerBalancerStore::Suspend() {
+ suspended_ = true;
+ load_record_map_.clear();
+ gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
+}
+
+void PerBalancerStore::Resume() {
+ suspended_ = false;
+ gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
+}
+
+uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
+ GPR_ASSERT(!suspended_);
+ last_reported_num_calls_in_progress_ = num_calls_in_progress_;
+ return num_calls_in_progress_;
+}
+
+void PerHostStore::ReportStreamCreated(const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ GPR_ASSERT(lb_id != kInvalidLbId);
+ SetUpForNewLbId(lb_id, load_key);
+ // Prior to this one, there was no load balancer receiving report, so we may
+ // have unassigned orphaned stores to assign to this new balancer.
+ // TODO(juanlishen): If the load key of this new stream is the same with
+ // some previously adopted orphan store, we may want to take the orphan to
+ // this stream. Need to discuss with LB team.
+ if (assigned_stores_.size() == 1) {
+ for (const auto& p : per_balancer_stores_) {
+ const grpc::string& other_lb_id = p.first;
+ const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
+ if (other_lb_id != lb_id) {
+ orphaned_store->Resume();
+ AssignOrphanedStore(orphaned_store.get(), lb_id);
+ }
+ }
+ }
+ // The first connected balancer will adopt the kInvalidLbId.
+ if (per_balancer_stores_.size() == 1) {
+ SetUpForNewLbId(kInvalidLbId, "");
+ ReportStreamClosed(kInvalidLbId);
+ }
+}
+
+void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) {
+ auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
+ GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
+ // Remove this closed stream from our records.
+ GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
+ load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
+ lb_id));
+ std::set<PerBalancerStore*> orphaned_stores =
+ UnorderedMapOfSetExtract(assigned_stores_, lb_id);
+ // The stores that were assigned to this balancer are orphaned now. They
+ // should be re-assigned to other balancers which are still receiving reports.
+ for (PerBalancerStore* orphaned_store : orphaned_stores) {
+ const grpc::string* new_receiver = nullptr;
+ auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
+ if (it != load_key_to_receiving_lb_ids_.end()) {
+ // First, try to pick from the active balancers with the same load key.
+ new_receiver = RandomElement(it->second);
+ } else if (!assigned_stores_.empty()) {
+ // If failed, pick from all the remaining active balancers.
+ new_receiver = &(RandomElement(assigned_stores_)->first);
+ }
+ if (new_receiver != nullptr) {
+ AssignOrphanedStore(orphaned_store, *new_receiver);
+ } else {
+ // Load data for an LB ID that can't be assigned to any stream should
+ // be dropped.
+ orphaned_store->Suspend();
+ }
+ }
+}
+
+PerBalancerStore* PerHostStore::FindPerBalancerStore(
+ const grpc::string& lb_id) const {
+ return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
+ ? per_balancer_stores_.find(lb_id)->second.get()
+ : nullptr;
+}
+
+const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
+ const grpc::string& lb_id) const {
+ auto it = assigned_stores_.find(lb_id);
+ if (it == assigned_stores_.end()) return nullptr;
+ return &(it->second);
+}
+
+void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
+ const grpc::string& new_receiver) {
+ auto it = assigned_stores_.find(new_receiver);
+ GPR_ASSERT(it != assigned_stores_.end());
+ it->second.insert(orphaned_store);
+ gpr_log(GPR_INFO,
+ "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
+ " ID of %s to new receiver %s",
+ this, orphaned_store, orphaned_store->lb_id().c_str(),
+ new_receiver.c_str());
+}
+
+void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ // The top-level caller (i.e., LoadReportService) should guarantee the
+ // lb_id is unique for each reporting stream.
+ GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
+ GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
+ load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
+ std::unique_ptr<PerBalancerStore> per_balancer_store(
+ new PerBalancerStore(lb_id, load_key));
+ assigned_stores_[lb_id] = {per_balancer_store.get()};
+ per_balancer_stores_[lb_id] = std::move(per_balancer_store);
+}
+
+PerBalancerStore* LoadDataStore::FindPerBalancerStore(
+ const string& hostname, const string& lb_id) const {
+ auto it = per_host_stores_.find(hostname);
+ if (it != per_host_stores_.end()) {
+ const PerHostStore& per_host_store = it->second;
+ return per_host_store.FindPerBalancerStore(lb_id);
+ } else {
+ return nullptr;
+ }
+}
+
+void LoadDataStore::MergeRow(const grpc::string& hostname,
+ const LoadRecordKey& key,
+ const LoadRecordValue& value) {
+ PerBalancerStore* per_balancer_store =
+ FindPerBalancerStore(hostname, key.lb_id());
+ if (per_balancer_store != nullptr) {
+ per_balancer_store->MergeRow(key, value);
+ return;
+ }
+ // Unknown LB ID. Track it until its number of in-progress calls drops to
+ // zero.
+ int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
+ if (in_progress_delta != 0) {
+ auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
+ if (it_tracker == unknown_balancer_id_trackers_.end()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
+ this, key.lb_id().c_str());
+ unknown_balancer_id_trackers_.insert(
+ {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
+ } else if ((it_tracker->second += in_progress_delta) == 0) {
+ unknown_balancer_id_trackers_.erase(it_tracker);
+ gpr_log(GPR_DEBUG,
+ "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
+ this, key.lb_id().c_str());
+ }
+ }
+}
+
+const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
+ const grpc::string& hostname, const grpc::string& lb_id) {
+ auto it = per_host_stores_.find(hostname);
+ if (it == per_host_stores_.end()) return nullptr;
+ return it->second.GetAssignedStores(lb_id);
+}
+
+void LoadDataStore::ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
+}
+
+void LoadDataStore::ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ auto it_per_host_store = per_host_stores_.find(hostname);
+ GPR_ASSERT(it_per_host_store != per_host_stores_.end());
+ it_per_host_store->second.ReportStreamClosed(lb_id);
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_data_store.h b/src/cpp/server/load_reporter/load_data_store.h
new file mode 100644
index 0000000000..feb8b2fd59
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_data_store.h
@@ -0,0 +1,339 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
+
+#include <grpc/support/port_platform.h>
+
+#include <memory>
+#include <set>
+#include <unordered_map>
+
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/config.h>
+
+namespace grpc {
+namespace load_reporter {
+
+constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
+constexpr uint8_t kLbIdLen = 8;
+
+// The load data storage is organized in hierarchy. The LoadDataStore is the
+// top-level data store. In LoadDataStore, for each host we keep a
+// PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
+// PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
+// to LoadRecordValue. The LoadRecordValue contains a map of customized call
+// metrics, mapping from a call metric name to the CallMetricValue.
+
+// The value of a customized call metric.
+class CallMetricValue {
+ public:
+ explicit CallMetricValue(uint64_t num_calls = 0,
+ double total_metric_value = 0)
+ : num_calls_(num_calls), total_metric_value_(total_metric_value) {}
+
+ void MergeFrom(CallMetricValue other) {
+ num_calls_ += other.num_calls_;
+ total_metric_value_ += other.total_metric_value_;
+ }
+
+ // Getters.
+ uint64_t num_calls() const { return num_calls_; }
+ double total_metric_value() const { return total_metric_value_; }
+
+ private:
+ // The number of calls that finished with this metric.
+ uint64_t num_calls_ = 0;
+ // The sum of metric values across all the calls that finished with this
+ // metric.
+ double total_metric_value_ = 0;
+};
+
+// The key of a load record.
+class LoadRecordKey {
+ public:
+ explicit LoadRecordKey(grpc::string lb_id, grpc::string lb_tag,
+ grpc::string user_id, grpc::string client_ip_hex)
+ : lb_id_(std::move(lb_id)),
+ lb_tag_(std::move(lb_tag)),
+ user_id_(std::move(user_id)),
+ client_ip_hex_(std::move(client_ip_hex)) {}
+
+ grpc::string ToString() const {
+ return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
+ ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
+ "]";
+ }
+
+ bool operator==(const LoadRecordKey& other) const {
+ return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ &&
+ user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
+ }
+
+ // Getters.
+ const grpc::string& lb_id() const { return lb_id_; }
+ const grpc::string& lb_tag() const { return lb_tag_; }
+ const grpc::string& user_id() const { return user_id_; }
+ const grpc::string& client_ip_hex() const { return client_ip_hex_; }
+
+ struct Hasher {
+ void hash_combine(size_t* seed, const grpc::string& k) const {
+ *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) +
+ (*seed >> 2);
+ }
+
+ size_t operator()(const LoadRecordKey& k) const {
+ size_t h = 0;
+ hash_combine(&h, k.lb_id_);
+ hash_combine(&h, k.lb_tag_);
+ hash_combine(&h, k.user_id_);
+ hash_combine(&h, k.client_ip_hex_);
+ return h;
+ }
+ };
+
+ private:
+ grpc::string lb_id_;
+ grpc::string lb_tag_;
+ grpc::string user_id_;
+ grpc::string client_ip_hex_;
+};
+
+// The value of a load record.
+class LoadRecordValue {
+ public:
+ explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
+ uint64_t error_count = 0, double bytes_sent = 0,
+ double bytes_recv = 0, double latency_ms = 0)
+ : start_count_(start_count),
+ ok_count_(ok_count),
+ error_count_(error_count),
+ bytes_sent_(bytes_sent),
+ bytes_recv_(bytes_recv),
+ latency_ms_(latency_ms) {}
+
+ void MergeFrom(const LoadRecordValue& other) {
+ start_count_ += other.start_count_;
+ ok_count_ += other.ok_count_;
+ error_count_ += other.error_count_;
+ bytes_sent_ += other.bytes_sent_;
+ bytes_recv_ += other.bytes_recv_;
+ latency_ms_ += other.latency_ms_;
+ for (const auto& p : other.call_metrics_) {
+ const grpc::string& key = p.first;
+ const CallMetricValue& value = p.second;
+ call_metrics_[key].MergeFrom(value);
+ }
+ }
+
+ int64_t GetNumCallsInProgressDelta() const {
+ return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
+ }
+
+ grpc::string ToString() const {
+ return "[start_count_=" + grpc::to_string(start_count_) +
+ ", ok_count_=" + grpc::to_string(ok_count_) +
+ ", error_count_=" + grpc::to_string(error_count_) +
+ ", bytes_sent_=" + grpc::to_string(bytes_sent_) +
+ ", bytes_recv_=" + grpc::to_string(bytes_recv_) +
+ ", latency_ms_=" + grpc::to_string(latency_ms_) + "]";
+ }
+
+ bool InsertCallMetric(const grpc::string& metric_name,
+ const CallMetricValue& metric_value) {
+ return call_metrics_.insert({metric_name, metric_value}).second;
+ }
+
+ // Getters.
+ uint64_t start_count() const { return start_count_; }
+ uint64_t ok_count() const { return ok_count_; }
+ uint64_t error_count() const { return error_count_; }
+ double bytes_sent() const { return bytes_sent_; }
+ double bytes_recv() const { return bytes_recv_; }
+ double latency_ms() const { return latency_ms_; }
+ const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
+ const {
+ return call_metrics_;
+ }
+
+ private:
+ uint64_t start_count_ = 0;
+ uint64_t ok_count_ = 0;
+ uint64_t error_count_ = 0;
+ double bytes_sent_ = 0;
+ double bytes_recv_ = 0;
+ double latency_ms_ = 0;
+ std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
+};
+
+// Stores the data associated with a particular LB ID.
+class PerBalancerStore {
+ public:
+ using LoadRecordMap =
+ std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
+
+ PerBalancerStore(grpc::string lb_id, grpc::string load_key)
+ : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {}
+
+ // Merge a load record with the given key and value if the store is not
+ // suspended.
+ void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value);
+
+ // Suspend this store, so that no detailed load data will be recorded.
+ void Suspend();
+ // Resume this store from suspension.
+ void Resume();
+ // Is this store suspended or not?
+ bool IsSuspended() const { return suspended_; }
+
+ bool IsNumCallsInProgressChangedSinceLastReport() const {
+ return num_calls_in_progress_ != last_reported_num_calls_in_progress_;
+ }
+
+ uint64_t GetNumCallsInProgressForReport();
+
+ grpc::string ToString() {
+ return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
+ "]";
+ }
+
+ void ClearLoadRecordMap() { load_record_map_.clear(); }
+
+ // Getters.
+ const grpc::string& lb_id() const { return lb_id_; }
+ const grpc::string& load_key() const { return load_key_; }
+ const LoadRecordMap& load_record_map() const { return load_record_map_; }
+
+ private:
+ grpc::string lb_id_;
+ // TODO(juanlishen): Use bytestring protobuf type?
+ grpc::string load_key_;
+ LoadRecordMap load_record_map_;
+ uint64_t num_calls_in_progress_ = 0;
+ uint64_t last_reported_num_calls_in_progress_ = 0;
+ bool suspended_ = false;
+};
+
+// Stores the data associated with a particular host.
+class PerHostStore {
+ public:
+ // When a report stream is created, a PerBalancerStore is created for the
+ // LB ID (guaranteed unique) associated with that stream. If it is the only
+ // active store, adopt all the orphaned stores. If it is the first created
+ // store, adopt the store of kInvalidLbId.
+ void ReportStreamCreated(const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // When a report stream is closed, the PerBalancerStores assigned to the
+ // associate LB ID need to be re-assigned to other active balancers,
+ // ideally with the same load key. If there is no active balancer, we have
+ // to suspend those stores and drop the incoming load data until they are
+ // resumed.
+ void ReportStreamClosed(const grpc::string& lb_id);
+
+ // Returns null if not found. Caller doesn't own the returned store.
+ PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const;
+
+ // Returns null if lb_id is not found. The returned pointer points to the
+ // underlying data structure, which is not owned by the caller.
+ const std::set<PerBalancerStore*>* GetAssignedStores(
+ const grpc::string& lb_id) const;
+
+ private:
+ // Creates a PerBalancerStore for the given LB ID, assigns the store to
+ // itself, and records the LB ID to the load key.
+ void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key);
+
+ void AssignOrphanedStore(PerBalancerStore* orphaned_store,
+ const grpc::string& new_receiver);
+
+ std::unordered_map<grpc::string, std::set<grpc::string>>
+ load_key_to_receiving_lb_ids_;
+
+ // Key: LB ID. The key set includes all the LB IDs that have been
+ // allocated for reporting streams so far.
+ // Value: the unique pointer to the PerBalancerStore of the LB ID.
+ std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>>
+ per_balancer_stores_;
+
+ // Key: LB ID. The key set includes the LB IDs of the balancers that are
+ // currently receiving report.
+ // Value: the set of raw pointers to the PerBalancerStores assigned to the LB
+ // ID. Note that the sets in assigned_stores_ form a division of the value set
+ // of per_balancer_stores_.
+ std::unordered_map<grpc::string, std::set<PerBalancerStore*>>
+ assigned_stores_;
+};
+
+// Thread-unsafe two-level bookkeeper of all the load data.
+// Note: We never remove any store objects from this class, as per the
+// current spec. That's because premature removal of the store objects
+// may lead to loss of critical information, e.g., mapping from lb_id to
+// load_key, and the number of in-progress calls. Such loss will cause
+// information inconsistency when the balancer is re-connected. Keeping
+// all the stores should be fine for PerHostStore, since we assume there
+// should only be a few hostnames. But it's a potential problem for
+// PerBalancerStore.
+class LoadDataStore {
+ public:
+ // Returns null if not found. Caller doesn't own the returned store.
+ PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname,
+ const grpc::string& lb_id) const;
+
+ // Returns null if hostname or lb_id is not found. The returned pointer points
+ // to the underlying data structure, which is not owned by the caller.
+ const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname,
+ const string& lb_id);
+
+ // If a PerBalancerStore can be found by the hostname and LB ID in
+ // LoadRecordKey, the load data will be merged to that store. Otherwise,
+ // only track the number of the in-progress calls for this unknown LB ID.
+ void MergeRow(const grpc::string& hostname, const LoadRecordKey& key,
+ const LoadRecordValue& value);
+
+ // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
+ // with some received load data but unknown to this load data store)?
+ bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const {
+ return unknown_balancer_id_trackers_.find(lb_id) !=
+ unknown_balancer_id_trackers_.end();
+ }
+
+ // Wrapper around PerHostStore::ReportStreamCreated.
+ void ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // Wrapper around PerHostStore::ReportStreamClosed.
+ void ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id);
+
+ private:
+ // Buffered data that was fetched from Census but hasn't been sent to
+ // balancer. We need to keep this data ourselves because Census will
+ // delete the data once it's returned.
+ std::unordered_map<grpc::string, PerHostStore> per_host_stores_;
+
+ // Tracks the number of in-progress calls for each unknown LB ID.
+ std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H