aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/load_reporter/load_reporter.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/load_reporter/load_reporter.h')
-rw-r--r--src/cpp/server/load_reporter/load_reporter.h225
1 files changed, 225 insertions, 0 deletions
diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h
new file mode 100644
index 0000000000..49a2e4b53c
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter.h
@@ -0,0 +1,225 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include <atomic>
+#include <chrono>
+#include <deque>
+#include <vector>
+
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/config.h>
+
+#include "src/cpp/server/load_reporter/load_data_store.h"
+#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
+
+#include "opencensus/stats/stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The interface to get the Census stats. Abstracted for mocking.
+class CensusViewProvider {
+ public:
+ // Maps from the view name to the view data.
+ using ViewDataMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
+ // Maps from the view name to the view descriptor.
+ using ViewDescriptorMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;
+
+ CensusViewProvider();
+ virtual ~CensusViewProvider() = default;
+
+ // Fetches the view data accumulated since last fetching, and returns it as a
+ // map from the view name to the view data.
+ virtual ViewDataMap FetchViewData() = 0;
+
+ // A helper function that gets a row with the input tag values from the view
+ // data. Only used when we know that row must exist because we have seen a row
+ // with the same tag values in a related view data. Several ViewData's are
+ // considered related if their views are based on the measures that are always
+ // recorded at the same time.
+ double static GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+
+ protected:
+ const ViewDescriptorMap& view_descriptor_map() const {
+ return view_descriptor_map_;
+ }
+
+ private:
+ ViewDescriptorMap view_descriptor_map_;
+ // Tag keys.
+ ::opencensus::stats::TagKey tag_key_token_;
+ ::opencensus::stats::TagKey tag_key_host_;
+ ::opencensus::stats::TagKey tag_key_user_id_;
+ ::opencensus::stats::TagKey tag_key_status_;
+ ::opencensus::stats::TagKey tag_key_metric_name_;
+};
+
+// The default implementation fetches the real stats from Census.
+class CensusViewProviderDefaultImpl : public CensusViewProvider {
+ public:
+ CensusViewProviderDefaultImpl();
+
+ ViewDataMap FetchViewData() override;
+
+ private:
+ std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
+};
+
+// The interface to get the CPU stats. Abstracted for mocking.
+class CpuStatsProvider {
+ public:
+ // The used and total amounts of CPU usage.
+ using CpuStatsSample = std::pair<uint64_t, uint64_t>;
+
+ virtual ~CpuStatsProvider() = default;
+
+ // Gets the cumulative used CPU and total CPU resource.
+ virtual CpuStatsSample GetCpuStats() = 0;
+};
+
+// The default implementation reads CPU jiffies from the system to calculate CPU
+// utilization.
+class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
+ public:
+ CpuStatsSample GetCpuStats() override;
+};
+
+// Maintains all the load data and load reporting streams.
+class LoadReporter {
+ public:
+ // TODO(juanlishen): Allow config for providers from users.
+ LoadReporter(uint32_t feedback_sample_window_seconds,
+ std::unique_ptr<CensusViewProvider> census_view_provider,
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
+ : feedback_sample_window_seconds_(feedback_sample_window_seconds),
+ census_view_provider_(std::move(census_view_provider)),
+ cpu_stats_provider_(std::move(cpu_stats_provider)) {
+ // Append the initial record so that the next real record can have a base.
+ AppendNewFeedbackRecord(0, 0);
+ }
+
+ // Fetches the latest data from Census and merge it into the data store.
+ // Also adds a new sample to the LB feedback sliding window.
+ // Thread-unsafe. (1). The access to the load data store and feedback records
+ // has locking. (2). The access to the Census view provider and CPU stats
+ // provider lacks locking, but we only access these two members in this method
+ // (in testing, we also access them when setting up expectation). So the
+ // invocations of this method must be serialized.
+ void FetchAndSample();
+
+ // Generates a report for that host and balancer. The report contains
+ // all the stats data accumulated between the last report (i.e., the last
+ // consumption) and the last fetch from Census (i.e., the last production).
+ // Thread-safe.
+ ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
+ const grpc::string& hostname, const grpc::string& lb_id);
+
+ // The feedback is calculated from the stats data recorded in the sliding
+ // window. Outdated records are discarded.
+ // Thread-safe.
+ ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
+
+ // Wrapper around LoadDataStore::ReportStreamCreated.
+ // Thread-safe.
+ void ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // Wrapper around LoadDataStore::ReportStreamClosed.
+ // Thread-safe.
+ void ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id);
+
+ // Generates a unique LB ID of length kLbIdLength. Returns an empty string
+ // upon failure. Thread-safe.
+ grpc::string GenerateLbId();
+
+ // Accessors only for testing.
+ CensusViewProvider* census_view_provider() {
+ return census_view_provider_.get();
+ }
+ CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
+
+ private:
+ struct LoadBalancingFeedbackRecord {
+ std::chrono::system_clock::time_point end_time;
+ uint64_t rpcs;
+ uint64_t errors;
+ uint64_t cpu_usage;
+ uint64_t cpu_limit;
+
+ LoadBalancingFeedbackRecord(
+ const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
+ uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
+ : end_time(end_time),
+ rpcs(rpcs),
+ errors(errors),
+ cpu_usage(cpu_usage),
+ cpu_limit(cpu_limit) {}
+ };
+
+ // Finds the view data about starting call from the view_data_map and merges
+ // the data to the load data store.
+ void ProcessViewDataCallStart(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about ending call from the view_data_map and merges the
+ // data to the load data store.
+ void ProcessViewDataCallEnd(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about the customized call metrics from the
+ // view_data_map and merges the data to the load data store.
+ void ProcessViewDataOtherCallMetrics(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+
+ bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
+ std::chrono::system_clock::time_point now) {
+ return record.end_time > now - feedback_sample_window_seconds_;
+ }
+
+ void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
+
+ // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
+ // it to the load.
+ void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
+ const PerBalancerStore& per_balancer_store);
+
+ std::atomic<int64_t> next_lb_id_{0};
+ const std::chrono::seconds feedback_sample_window_seconds_;
+ std::mutex feedback_mu_;
+ std::deque<LoadBalancingFeedbackRecord> feedback_records_;
+ // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
+ // too expensive.
+ std::mutex store_mu_;
+ LoadDataStore load_data_store_;
+ std::unique_ptr<CensusViewProvider> census_view_provider_;
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H