aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-06-21 13:02:40 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-06-21 13:21:02 -0700
commit291bbc70146c98375e9241ccb72c9f38acebac55 (patch)
tree0610b35d3b61b2f1186a2bbcac91e68fe402e983 /src
parent06cc5b583ac2e19111c45fa0246252efe997f0cb (diff)
Add load reporter
Diffstat (limited to 'src')
-rw-r--r--src/cpp/server/load_reporter/constants.h71
-rw-r--r--src/cpp/server/load_reporter/get_cpu_stats.h36
-rw-r--r--src/cpp/server/load_reporter/get_cpu_stats_linux.cc45
-rw-r--r--src/cpp/server/load_reporter/get_cpu_stats_macos.cc45
-rw-r--r--src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc40
-rw-r--r--src/cpp/server/load_reporter/get_cpu_stats_windows.cc55
-rw-r--r--src/cpp/server/load_reporter/load_data_store.cc65
-rw-r--r--src/cpp/server/load_reporter/load_data_store.h34
-rw-r--r--src/cpp/server/load_reporter/load_reporter.cc498
-rw-r--r--src/cpp/server/load_reporter/load_reporter.h225
-rw-r--r--src/proto/grpc/lb/v1/BUILD14
-rw-r--r--src/proto/grpc/lb/v1/load_reporter.proto180
12 files changed, 1293 insertions, 15 deletions
diff --git a/src/cpp/server/load_reporter/constants.h b/src/cpp/server/load_reporter/constants.h
new file mode 100644
index 0000000000..07c5965fff
--- /dev/null
+++ b/src/cpp/server/load_reporter/constants.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+namespace grpc {
+namespace load_reporter {
+
+constexpr size_t kLbIdLength = 8;
+constexpr size_t kIpv4AddressLength = 8;
+constexpr size_t kIpv6AddressLength = 32;
+
+constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
+
+// Call statuses.
+
+constexpr char kCallStatusOk[] = "OK";
+constexpr char kCallStatusServerError[] = "5XX";
+constexpr char kCallStatusClientError[] = "4XX";
+
+// Tag keys.
+
+constexpr char kTagKeyToken[] = "token";
+constexpr char kTagKeyHost[] = "host";
+constexpr char kTagKeyUserId[] = "user_id";
+constexpr char kTagKeyStatus[] = "status";
+constexpr char kTagKeyMetricName[] = "metric_name";
+
+// Measure names.
+
+constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
+constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
+constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
+constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
+constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
+constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
+
+// View names.
+
+constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
+constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
+constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
+constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
+constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
+constexpr char kViewOtherCallMetricCount[] =
+ "grpc.io/lb_view/other_call_metric_count";
+constexpr char kViewOtherCallMetricValue[] =
+ "grpc.io/lb_view/other_call_metric_value";
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
diff --git a/src/cpp/server/load_reporter/get_cpu_stats.h b/src/cpp/server/load_reporter/get_cpu_stats.h
new file mode 100644
index 0000000000..f514b0752f
--- /dev/null
+++ b/src/cpp/server/load_reporter/get_cpu_stats.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <utility>
+
+namespace grpc {
+namespace load_reporter {
+
+// Reads the CPU stats (in a pair of busy and total numbers) from the system.
+// The units of the stats should be the same.
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
diff --git a/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
new file mode 100644
index 0000000000..9c1fd0cd0b
--- /dev/null
+++ b/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_LINUX
+
+#include <cstdio>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILE* fp;
+ fp = fopen("/proc/stat", "r");
+ uint64_t user, nice, system, idle;
+ fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle);
+ fclose(fp);
+ busy = user + nice + system;
+ total = busy + idle;
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_LINUX
diff --git a/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
new file mode 100644
index 0000000000..dbdde304c2
--- /dev/null
+++ b/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_APPLE
+
+#include <mach/mach.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ host_cpu_load_info_data_t cpuinfo;
+ mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
+ if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
+ (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
+ for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
+ busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_APPLE
diff --git a/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
new file mode 100644
index 0000000000..80fb8b6da1
--- /dev/null
+++ b/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+
+#include <grpc/support/log.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ gpr_log(GPR_ERROR,
+ "Platforms other than Linux, Windows, and MacOS are not supported.");
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
diff --git a/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
new file mode 100644
index 0000000000..0a98e848a2
--- /dev/null
+++ b/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include <windows.h>
+#include <cstdint>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+namespace {
+
+uint64_t FiletimeToInt(const FILETIME& ft) {
+ ULARGE_INTEGER i;
+ i.LowPart = ft.dwLowDateTime;
+ i.HighPart = ft.dwHighDateTime;
+ return i.QuadPart;
+}
+
+} // namespace
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILETIME idle, kernel, user;
+ if (GetSystemTimes(&idle, &kernel, &user) != 0) {
+ total = FiletimeToInt(kernel) + FiletimeToInt(user);
+ busy = total - FiletimeToInt(idle);
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_WINDOWS
diff --git a/src/cpp/server/load_reporter/load_data_store.cc b/src/cpp/server/load_reporter/load_data_store.cc
index 70f12c1102..594473f5e7 100644
--- a/src/cpp/server/load_reporter/load_data_store.cc
+++ b/src/cpp/server/load_reporter/load_data_store.cc
@@ -16,11 +16,15 @@
*
*/
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
@@ -73,6 +77,67 @@ const typename C::value_type* RandomElement(const C& container) {
} // namespace
+LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
+ grpc::string user_id)
+ : user_id_(std::move(user_id)) {
+ GPR_ASSERT(client_ip_and_token.size() >= 2);
+ int ip_hex_size;
+ GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
+ &ip_hex_size) == 1);
+ GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
+ ip_hex_size == kIpv6AddressLength);
+ size_t cur_pos = 2;
+ client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
+ cur_pos += ip_hex_size;
+ if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
+ lb_id_ = kInvalidLbId;
+ lb_tag_ = "";
+ } else {
+ lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
+ lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
+ }
+}
+
+grpc::string LoadRecordKey::GetClientIpBytes() const {
+ if (client_ip_hex_.empty()) {
+ return "";
+ } else if (client_ip_hex_.size() == kIpv4AddressLength) {
+ uint32_t ip_bytes;
+ if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
+ gpr_log(GPR_ERROR,
+ "Can't parse client IP (%s) from a hex string to an integer.",
+ client_ip_hex_.c_str());
+ return "";
+ }
+ ip_bytes = grpc_htonl(ip_bytes);
+ return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
+ sizeof(ip_bytes));
+ } else if (client_ip_hex_.size() == kIpv6AddressLength) {
+ uint32_t ip_bytes[4];
+ for (size_t i = 0; i < 4; ++i) {
+ if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
+ ip_bytes + i) != 1) {
+ gpr_log(
+ GPR_ERROR,
+ "Can't parse client IP part (%s) from a hex string to an integer.",
+ client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
+ return "";
+ }
+ ip_bytes[i] = grpc_htonl(ip_bytes[i]);
+ }
+ return grpc::string(reinterpret_cast<const char*>(ip_bytes),
+ sizeof(ip_bytes));
+ } else {
+ GPR_UNREACHABLE_CODE(return "");
+ }
+}
+
+LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
+ double total_metric_value) {
+ call_metrics_.emplace(std::move(metric_name),
+ CallMetricValue(num_calls, total_metric_value));
+}
+
void PerBalancerStore::MergeRow(const LoadRecordKey& key,
const LoadRecordValue& value) {
// During suspension, the load data received will be dropped.
diff --git a/src/cpp/server/load_reporter/load_data_store.h b/src/cpp/server/load_reporter/load_data_store.h
index feb8b2fd59..2da78ea064 100644
--- a/src/cpp/server/load_reporter/load_data_store.h
+++ b/src/cpp/server/load_reporter/load_data_store.h
@@ -28,12 +28,11 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
+#include "src/cpp/server/load_reporter/constants.h"
+
namespace grpc {
namespace load_reporter {
-constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
-constexpr uint8_t kLbIdLen = 8;
-
// The load data storage is organized in hierarchy. The LoadDataStore is the
// top-level data store. In LoadDataStore, for each host we keep a
// PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
@@ -68,13 +67,16 @@ class CallMetricValue {
// The key of a load record.
class LoadRecordKey {
public:
- explicit LoadRecordKey(grpc::string lb_id, grpc::string lb_tag,
- grpc::string user_id, grpc::string client_ip_hex)
+ LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id,
+ grpc::string client_ip_hex)
: lb_id_(std::move(lb_id)),
lb_tag_(std::move(lb_tag)),
user_id_(std::move(user_id)),
client_ip_hex_(std::move(client_ip_hex)) {}
+ // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
+ LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id);
+
grpc::string ToString() const {
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
@@ -86,6 +88,9 @@ class LoadRecordKey {
user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
}
+ // Gets the client IP bytes in network order (i.e., big-endian).
+ grpc::string GetClientIpBytes() const;
+
// Getters.
const grpc::string& lb_id() const { return lb_id_; }
const grpc::string& lb_tag() const { return lb_tag_; }
@@ -119,8 +124,8 @@ class LoadRecordKey {
class LoadRecordValue {
public:
explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
- uint64_t error_count = 0, double bytes_sent = 0,
- double bytes_recv = 0, double latency_ms = 0)
+ uint64_t error_count = 0, uint64_t bytes_sent = 0,
+ uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
: start_count_(start_count),
ok_count_(ok_count),
error_count_(error_count),
@@ -128,6 +133,9 @@ class LoadRecordValue {
bytes_recv_(bytes_recv),
latency_ms_(latency_ms) {}
+ LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
+ double total_metric_value);
+
void MergeFrom(const LoadRecordValue& other) {
start_count_ += other.start_count_;
ok_count_ += other.ok_count_;
@@ -164,9 +172,9 @@ class LoadRecordValue {
uint64_t start_count() const { return start_count_; }
uint64_t ok_count() const { return ok_count_; }
uint64_t error_count() const { return error_count_; }
- double bytes_sent() const { return bytes_sent_; }
- double bytes_recv() const { return bytes_recv_; }
- double latency_ms() const { return latency_ms_; }
+ uint64_t bytes_sent() const { return bytes_sent_; }
+ uint64_t bytes_recv() const { return bytes_recv_; }
+ uint64_t latency_ms() const { return latency_ms_; }
const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
const {
return call_metrics_;
@@ -176,9 +184,9 @@ class LoadRecordValue {
uint64_t start_count_ = 0;
uint64_t ok_count_ = 0;
uint64_t error_count_ = 0;
- double bytes_sent_ = 0;
- double bytes_recv_ = 0;
- double latency_ms_ = 0;
+ uint64_t bytes_sent_ = 0;
+ uint64_t bytes_recv_ = 0;
+ uint64_t latency_ms_ = 0;
std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
};
diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc
new file mode 100644
index 0000000000..3f0063d883
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter.cc
@@ -0,0 +1,498 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdint.h>
+#include <stdio.h>
+#include <chrono>
+#include <ctime>
+
+#include "src/cpp/server/load_reporter/constants.h"
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+#include "opencensus/stats/internal/set_aggregation_window.h"
+
+namespace grpc {
+namespace load_reporter {
+
+CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
+ return GetCpuStatsImpl();
+}
+
+CensusViewProvider::CensusViewProvider()
+ : tag_key_token_(::opencensus::stats::TagKey::Register(kTagKeyToken)),
+ tag_key_host_(::opencensus::stats::TagKey::Register(kTagKeyHost)),
+ tag_key_user_id_(::opencensus::stats::TagKey::Register(kTagKeyUserId)),
+ tag_key_status_(::opencensus::stats::TagKey::Register(kTagKeyStatus)),
+ tag_key_metric_name_(
+ ::opencensus::stats::TagKey::Register(kTagKeyMetricName)) {
+ // One view related to starting a call.
+ auto vd_start_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewStartCount)
+ .set_measure(kMeasureStartCount)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .set_description(
+ "Delta count of calls started broken down by <token, host, "
+ "user_id>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
+ view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
+ // Four views related to ending a call.
+ // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
+ // measure), it's infeasible to prepare fake data for testing. That's because
+ // the OpenCensus API to make up view data will add the input data as separate
+ // measurements instead of setting the data values directly.
+ auto vd_end_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewEndCount))
+ .set_measure((kMeasureEndCount))
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta count of calls ended broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
+ view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
+ auto vd_end_bytes_sent =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewEndBytesSent))
+ .set_measure((kMeasureEndBytesSent))
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of bytes sent broken down by <token, host, user_id, "
+ "status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
+ view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
+ auto vd_end_bytes_received =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewEndBytesReceived))
+ .set_measure((kMeasureEndBytesReceived))
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of bytes received broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
+ view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
+ auto vd_end_latency_ms =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewEndLatencyMs))
+ .set_measure((kMeasureEndLatencyMs))
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of latency in ms broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
+ view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
+ // Two views related to other call metrics.
+ auto vd_metric_call_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewOtherCallMetricCount))
+ .set_measure((kMeasureOtherCallMetric))
+ .set_aggregation(::opencensus::stats::Aggregation::Count())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_metric_name_)
+ .set_description(
+ "Delta count of calls broken down by <token, host, user_id, "
+ "metric_name>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
+ view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
+ auto vd_metric_value =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name((kViewOtherCallMetricValue))
+ .set_measure((kMeasureOtherCallMetric))
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_metric_name_)
+ .set_description(
+ "Delta sum of call metric value broken down "
+ "by <token, host, user_id, metric_name>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
+ view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
+}
+
+double CensusViewProvider::GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ auto it_row = it_vd->second.double_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.double_data().end());
+ return it_row->second;
+}
+
+CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
+ for (const auto& p : view_descriptor_map()) {
+ const grpc::string& view_name = p.first;
+ const ::opencensus::stats::ViewDescriptor& vd = p.second;
+ // We need to use pair's piecewise ctor here, otherwise the deleted copy
+ // ctor of View will be called.
+ view_map_.emplace(std::piecewise_construct,
+ std::forward_as_tuple(view_name),
+ std::forward_as_tuple(vd));
+ }
+}
+
+CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
+ gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
+ ViewDataMap view_data_map;
+ for (auto& p : view_map_) {
+ const grpc::string& view_name = p.first;
+ ::opencensus::stats::View& view = p.second;
+ if (view.IsValid()) {
+ view_data_map.emplace(view_name, view.GetData());
+ gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
+ view_name.c_str());
+ } else {
+ gpr_log(
+ GPR_DEBUG,
+ "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
+ this, view_name.c_str());
+ }
+ }
+ return view_data_map;
+}
+
+grpc::string LoadReporter::GenerateLbId() {
+ while (true) {
+ if (next_lb_id_ > UINT32_MAX) {
+ gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
+ this);
+ return "";
+ }
+ int64_t lb_id = next_lb_id_++;
+ // Overflow should never happen.
+ GPR_ASSERT(lb_id >= 0);
+ // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
+ char buf[kLbIdLength + 1];
+ snprintf(buf, sizeof(buf), "%08lx", lb_id);
+ grpc::string lb_id_str(buf, kLbIdLength);
+ // The client may send requests with LB ID that has never been allocated
+ // by this load reporter. Those IDs are tracked and will be skipped when
+ // we generate a new ID.
+ if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
+ return lb_id_str;
+ }
+ }
+}
+
+::grpc::lb::v1::LoadBalancingFeedback
+LoadReporter::GenerateLoadBalancingFeedback() {
+ std::unique_lock<std::mutex> lock(feedback_mu_);
+ auto now = std::chrono::system_clock::now();
+ // Discard records outside the window until there is only one record
+ // outside the window, which is used as the base for difference.
+ while (feedback_records_.size() > 1 &&
+ !IsRecordInWindow(feedback_records_[1], now)) {
+ feedback_records_.pop_front();
+ }
+ if (feedback_records_.size() < 2) {
+ return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
+ }
+ // Find the longest range with valid ends.
+ LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
+ LoadBalancingFeedbackRecord* newest =
+ &feedback_records_[feedback_records_.size() - 1];
+ while (newest > oldest &&
+ (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
+ // A zero limit means that the system info reading was failed, so these
+ // records can't be used to calculate CPU utilization.
+ if (newest->cpu_limit == 0) --newest;
+ if (oldest->cpu_limit == 0) ++oldest;
+ }
+ if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
+ newest->cpu_limit == oldest->cpu_limit) {
+ return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
+ }
+ uint64_t rpcs = 0;
+ uint64_t errors = 0;
+ for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
+ // Because these two numbers are counters, the oldest record shouldn't be
+ // included.
+ rpcs += p->rpcs;
+ errors += p->errors;
+ }
+ double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
+ double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
+ std::chrono::duration<double> duration_seconds =
+ newest->end_time - oldest->end_time;
+ lock.unlock();
+ ::grpc::lb::v1::LoadBalancingFeedback feedback;
+ feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
+ feedback.set_calls_per_second(
+ static_cast<float>(rpcs / duration_seconds.count()));
+ feedback.set_errors_per_second(
+ static_cast<float>(errors / duration_seconds.count()));
+ return feedback;
+}
+
+::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
+LoadReporter::GenerateLoads(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
+ GPR_ASSERT(assigned_stores != nullptr);
+ GPR_ASSERT(!assigned_stores->empty());
+ ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
+ for (PerBalancerStore* per_balancer_store : *assigned_stores) {
+ GPR_ASSERT(!per_balancer_store->IsSuspended());
+ if (!per_balancer_store->load_record_map().empty()) {
+ for (const auto& p : per_balancer_store->load_record_map()) {
+ const auto& key = p.first;
+ const auto& value = p.second;
+ auto load = loads.Add();
+ load->set_load_balance_tag(key.lb_tag());
+ load->set_user_id(key.user_id());
+ load->set_client_ip_address(key.GetClientIpBytes());
+ load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
+ load->set_num_calls_finished_without_error(
+ static_cast<int64_t>(value.ok_count()));
+ load->set_num_calls_finished_with_error(
+ static_cast<int64_t>(value.error_count()));
+ load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
+ load->set_total_bytes_received(
+ static_cast<int64_t>(value.bytes_recv()));
+ load->mutable_total_latency()->set_seconds(
+ static_cast<int64_t>(value.latency_ms() / 1000));
+ load->mutable_total_latency()->set_nanos(
+ (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
+ for (const auto& p : value.call_metrics()) {
+ const grpc::string& metric_name = p.first;
+ const CallMetricValue& metric_value = p.second;
+ auto call_metric_data = load->add_metric_data();
+ call_metric_data->set_metric_name(metric_name);
+ call_metric_data->set_num_calls_finished_with_metric(
+ metric_value.num_calls());
+ call_metric_data->set_total_metric_value(
+ metric_value.total_metric_value());
+ }
+ if (per_balancer_store->lb_id() != lb_id) {
+ // This per-balancer store is an orphan assigned to this receiving
+ // balancer.
+ AttachOrphanLoadId(load, *per_balancer_store);
+ }
+ }
+ per_balancer_store->ClearLoadRecordMap();
+ }
+ if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
+ auto load = loads.Add();
+ load->set_num_calls_in_progress(
+ per_balancer_store->GetNumCallsInProgressForReport());
+ if (per_balancer_store->lb_id() != lb_id) {
+ // This per-balancer store is an orphan assigned to this receiving
+ // balancer.
+ AttachOrphanLoadId(load, *per_balancer_store);
+ }
+ }
+ }
+ return loads;
+}
+
+void LoadReporter::AttachOrphanLoadId(
+ ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
+ if (per_balancer_store.lb_id() == kInvalidLbId) {
+ load->set_load_key_unknown(true);
+ } else {
+ load->set_load_key_unknown(false);
+ load->mutable_orphaned_load_identifier()->set_load_key(
+ per_balancer_store.load_key());
+ load->mutable_orphaned_load_identifier()->set_load_balancer_id(
+ per_balancer_store.lb_id());
+ }
+}
+
+void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
+ CpuStatsProvider::CpuStatsSample cpu_stats;
+ if (cpu_stats_provider_ != nullptr) {
+ cpu_stats = cpu_stats_provider_->GetCpuStats();
+ } else {
+ // This will make the load balancing feedback generation a no-op.
+ cpu_stats = {0, 0};
+ }
+ std::unique_lock<std::mutex> lock(feedback_mu_);
+ feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
+ cpu_stats.first, cpu_stats.second);
+}
+
+void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
+ gpr_log(GPR_INFO,
+ "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
+ this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
+}
+
+void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ load_data_store_.ReportStreamClosed(hostname, lb_id);
+ gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
+ hostname.c_str(), lb_id.c_str());
+}
+
+void LoadReporter::ProcessViewDataCallStart(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ auto it = view_data_map.find(kViewStartCount);
+ if (it != view_data_map.end()) {
+ // Note that the data type for any Sum view is double, whatever the data
+ // type of the original measure.
+ for (const auto& p : it->second.double_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const uint64_t start_count = static_cast<uint64_t>(p.second);
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ LoadRecordKey key(client_ip_and_token, user_id);
+ LoadRecordValue value = LoadRecordValue(start_count);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+}
+
+void LoadReporter::ProcessViewDataCallEnd(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ uint64_t total_end_count = 0;
+ uint64_t total_error_count = 0;
+ auto it = view_data_map.find(kViewEndCount);
+ if (it != view_data_map.end()) {
+ // Note that the data type for any Sum view is double, whatever the data
+ // type of the original measure.
+ for (const auto& p : it->second.double_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const uint64_t end_count = static_cast<uint64_t>(p.second);
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ const grpc::string& status = tag_values[3];
+ // This is due to a bug reported internally of Java server load reporting
+ // implementation.
+ // TODO(juanlishen): Check whether this situation happens in OSS C++.
+ if (client_ip_and_token.size() == 0) {
+ gpr_log(GPR_DEBUG,
+ "Skipping processing Opencensus record with empty "
+ "client_ip_and_token tag.");
+ continue;
+ }
+ LoadRecordKey key(client_ip_and_token, user_id);
+ const uint64_t bytes_sent =
+ CensusViewProvider::GetRelatedViewDataRowDouble(
+ view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
+ tag_values);
+ const uint64_t bytes_received =
+ CensusViewProvider::GetRelatedViewDataRowDouble(
+ view_data_map, kViewEndBytesReceived,
+ sizeof(kViewEndBytesReceived) - 1, tag_values);
+ const uint64_t latency_ms =
+ CensusViewProvider::GetRelatedViewDataRowDouble(
+ view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
+ tag_values);
+ uint64_t ok_count = 0;
+ uint64_t error_count = 0;
+ total_end_count += end_count;
+ if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
+ ok_count = end_count;
+ } else {
+ error_count = end_count;
+ total_error_count += end_count;
+ }
+ LoadRecordValue value = LoadRecordValue(
+ 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+ AppendNewFeedbackRecord(total_end_count, total_error_count);
+}
+
+void LoadReporter::ProcessViewDataOtherCallMetrics(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ auto it = view_data_map.find(kViewOtherCallMetricCount);
+ if (it != view_data_map.end()) {
+ for (const auto& p : it->second.int_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const int64_t num_calls = p.second;
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ const grpc::string& metric_name = tag_values[3];
+ LoadRecordKey key(client_ip_and_token, user_id);
+ const double total_metric_value =
+ CensusViewProvider::GetRelatedViewDataRowDouble(
+ view_data_map, kViewOtherCallMetricValue,
+ sizeof(kViewOtherCallMetricValue) - 1, tag_values);
+ LoadRecordValue value = LoadRecordValue(
+ metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+}
+
+void LoadReporter::FetchAndSample() {
+ gpr_log(GPR_DEBUG,
+ "[LR %p] Starts fetching Census view data and sampling LB feedback "
+ "record.",
+ this);
+ CensusViewProvider::ViewDataMap view_data_map =
+ census_view_provider_->FetchViewData();
+ ProcessViewDataCallStart(view_data_map);
+ ProcessViewDataCallEnd(view_data_map);
+ ProcessViewDataOtherCallMetrics(view_data_map);
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h
new file mode 100644
index 0000000000..49a2e4b53c
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter.h
@@ -0,0 +1,225 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include <atomic>
+#include <chrono>
+#include <deque>
+#include <vector>
+
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/config.h>
+
+#include "src/cpp/server/load_reporter/load_data_store.h"
+#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
+
+#include "opencensus/stats/stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The interface to get the Census stats. Abstracted for mocking.
+class CensusViewProvider {
+ public:
+ // Maps from the view name to the view data.
+ using ViewDataMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
+ // Maps from the view name to the view descriptor.
+ using ViewDescriptorMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;
+
+ CensusViewProvider();
+ virtual ~CensusViewProvider() = default;
+
+ // Fetches the view data accumulated since last fetching, and returns it as a
+ // map from the view name to the view data.
+ virtual ViewDataMap FetchViewData() = 0;
+
+ // A helper function that gets a row with the input tag values from the view
+ // data. Only used when we know that row must exist because we have seen a row
+ // with the same tag values in a related view data. Several ViewData's are
+ // considered related if their views are based on the measures that are always
+ // recorded at the same time.
+ double static GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+
+ protected:
+ const ViewDescriptorMap& view_descriptor_map() const {
+ return view_descriptor_map_;
+ }
+
+ private:
+ ViewDescriptorMap view_descriptor_map_;
+ // Tag keys.
+ ::opencensus::stats::TagKey tag_key_token_;
+ ::opencensus::stats::TagKey tag_key_host_;
+ ::opencensus::stats::TagKey tag_key_user_id_;
+ ::opencensus::stats::TagKey tag_key_status_;
+ ::opencensus::stats::TagKey tag_key_metric_name_;
+};
+
+// The default implementation fetches the real stats from Census.
+class CensusViewProviderDefaultImpl : public CensusViewProvider {
+ public:
+ CensusViewProviderDefaultImpl();
+
+ ViewDataMap FetchViewData() override;
+
+ private:
+ std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
+};
+
+// The interface to get the CPU stats. Abstracted for mocking.
+class CpuStatsProvider {
+ public:
+ // The used and total amounts of CPU usage.
+ using CpuStatsSample = std::pair<uint64_t, uint64_t>;
+
+ virtual ~CpuStatsProvider() = default;
+
+ // Gets the cumulative used CPU and total CPU resource.
+ virtual CpuStatsSample GetCpuStats() = 0;
+};
+
+// The default implementation reads CPU jiffies from the system to calculate CPU
+// utilization.
+class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
+ public:
+ CpuStatsSample GetCpuStats() override;
+};
+
+// Maintains all the load data and load reporting streams.
+class LoadReporter {
+ public:
+ // TODO(juanlishen): Allow config for providers from users.
+ LoadReporter(uint32_t feedback_sample_window_seconds,
+ std::unique_ptr<CensusViewProvider> census_view_provider,
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
+ : feedback_sample_window_seconds_(feedback_sample_window_seconds),
+ census_view_provider_(std::move(census_view_provider)),
+ cpu_stats_provider_(std::move(cpu_stats_provider)) {
+ // Append the initial record so that the next real record can have a base.
+ AppendNewFeedbackRecord(0, 0);
+ }
+
+ // Fetches the latest data from Census and merge it into the data store.
+ // Also adds a new sample to the LB feedback sliding window.
+ // Thread-unsafe. (1). The access to the load data store and feedback records
+ // has locking. (2). The access to the Census view provider and CPU stats
+ // provider lacks locking, but we only access these two members in this method
+ // (in testing, we also access them when setting up expectation). So the
+ // invocations of this method must be serialized.
+ void FetchAndSample();
+
+ // Generates a report for that host and balancer. The report contains
+ // all the stats data accumulated between the last report (i.e., the last
+ // consumption) and the last fetch from Census (i.e., the last production).
+ // Thread-safe.
+ ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
+ const grpc::string& hostname, const grpc::string& lb_id);
+
+ // The feedback is calculated from the stats data recorded in the sliding
+ // window. Outdated records are discarded.
+ // Thread-safe.
+ ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
+
+ // Wrapper around LoadDataStore::ReportStreamCreated.
+ // Thread-safe.
+ void ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // Wrapper around LoadDataStore::ReportStreamClosed.
+ // Thread-safe.
+ void ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id);
+
+ // Generates a unique LB ID of length kLbIdLength. Returns an empty string
+ // upon failure. Thread-safe.
+ grpc::string GenerateLbId();
+
+ // Accessors only for testing.
+ CensusViewProvider* census_view_provider() {
+ return census_view_provider_.get();
+ }
+ CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
+
+ private:
+ struct LoadBalancingFeedbackRecord {
+ std::chrono::system_clock::time_point end_time;
+ uint64_t rpcs;
+ uint64_t errors;
+ uint64_t cpu_usage;
+ uint64_t cpu_limit;
+
+ LoadBalancingFeedbackRecord(
+ const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
+ uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
+ : end_time(end_time),
+ rpcs(rpcs),
+ errors(errors),
+ cpu_usage(cpu_usage),
+ cpu_limit(cpu_limit) {}
+ };
+
+ // Finds the view data about starting call from the view_data_map and merges
+ // the data to the load data store.
+ void ProcessViewDataCallStart(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about ending call from the view_data_map and merges the
+ // data to the load data store.
+ void ProcessViewDataCallEnd(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about the customized call metrics from the
+ // view_data_map and merges the data to the load data store.
+ void ProcessViewDataOtherCallMetrics(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+
+ bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
+ std::chrono::system_clock::time_point now) {
+ return record.end_time > now - feedback_sample_window_seconds_;
+ }
+
+ void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
+
+ // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
+ // it to the load.
+ void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
+ const PerBalancerStore& per_balancer_store);
+
+ std::atomic<int64_t> next_lb_id_{0};
+ const std::chrono::seconds feedback_sample_window_seconds_;
+ std::mutex feedback_mu_;
+ std::deque<LoadBalancingFeedbackRecord> feedback_records_;
+ // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
+ // too expensive.
+ std::mutex store_mu_;
+ LoadDataStore load_data_store_;
+ std::unique_ptr<CensusViewProvider> census_view_provider_;
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
diff --git a/src/proto/grpc/lb/v1/BUILD b/src/proto/grpc/lb/v1/BUILD
index 15bf3c3233..c042459218 100644
--- a/src/proto/grpc/lb/v1/BUILD
+++ b/src/proto/grpc/lb/v1/BUILD
@@ -14,11 +14,21 @@
licenses(["notice"]) # Apache v2
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
+load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
-grpc_package(name = "lb", visibility = "public")
+grpc_package(
+ name = "lb",
+ visibility = "public",
+)
grpc_proto_library(
name = "load_balancer_proto",
srcs = ["load_balancer.proto"],
)
+
+grpc_proto_library(
+ name = "load_reporter_proto",
+ srcs = ["load_reporter.proto"],
+ has_services = True,
+ well_known_protos = True,
+)
diff --git a/src/proto/grpc/lb/v1/load_reporter.proto b/src/proto/grpc/lb/v1/load_reporter.proto
new file mode 100644
index 0000000000..c2e4f23f6e
--- /dev/null
+++ b/src/proto/grpc/lb/v1/load_reporter.proto
@@ -0,0 +1,180 @@
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package grpc.lb.v1;
+
+import "google/protobuf/duration.proto";
+
+// The LoadReporter service.
+service LoadReporter {
+ // Report load from server to lb.
+ rpc ReportLoad(stream LoadReportRequest)
+ returns (stream LoadReportResponse) {
+ };
+}
+
+message LoadReportRequest {
+ // This message should be sent on the first request to the gRPC server.
+ InitialLoadReportRequest initial_request = 1;
+}
+
+message InitialLoadReportRequest {
+ // The hostname this load reporter client is requesting load for.
+ string load_balanced_hostname = 1;
+
+ // Additional information to disambiguate orphaned load: load that should have
+ // gone to this load reporter client, but was not able to be sent since the
+ // load reporter client has disconnected. load_key is sent in orphaned load
+ // reports; see Load.load_key.
+ bytes load_key = 2;
+
+ // This interval defines how often the server should send load reports to
+ // the load balancer.
+ google.protobuf.Duration load_report_interval = 3;
+}
+
+message LoadReportResponse {
+ // This message should be sent on the first response to the load balancer.
+ InitialLoadReportResponse initial_response = 1;
+
+ // Reports server-wide statistics for load balancing.
+ // This should be reported with every response.
+ LoadBalancingFeedback load_balancing_feedback = 2;
+
+ // A load report for each <tag, user_id> tuple. This could be considered to be
+ // a multimap indexed by <tag, user_id>. It is not strictly necessary to
+ // aggregate all entries into one entry per <tag, user_id> tuple, although it
+ // is preferred to do so.
+ repeated Load load = 3;
+}
+
+message InitialLoadReportResponse {
+ // Initial response returns the Load balancer ID. This must be plain text
+ // (printable ASCII).
+ string load_balancer_id = 1;
+
+ enum ImplementationIdentifier {
+ IMPL_UNSPECIFIED = 0;
+ CPP = 1; // Standard Google C++ implementation.
+ JAVA = 2; // Standard Google Java implementation.
+ GO = 3; // Standard Google Go implementation.
+ }
+ // Optional identifier of this implementation of the load reporting server.
+ ImplementationIdentifier implementation_id = 2;
+
+ // Optional server_version should be a value that is modified (and
+ // monotonically increased) when changes are made to the server
+ // implementation.
+ int64 server_version = 3;
+}
+
+message LoadBalancingFeedback {
+ // Reports the current utilization of the server (typical range [0.0 - 1.0]).
+ float server_utilization = 1;
+
+ // The total rate of calls handled by this server (including errors).
+ float calls_per_second = 2;
+
+ // The total rate of error responses sent by this server.
+ float errors_per_second = 3;
+}
+
+message Load {
+ // The (plain text) tag used by the calls covered by this load report. The
+ // tag is that part of the load balancer token after removing the load
+ // balancer id. Empty is equivalent to non-existent tag.
+ string load_balance_tag = 1;
+
+ // The user identity authenticated by the calls covered by this load
+ // report. Empty is equivalent to no known user_id.
+ string user_id = 3;
+
+ // IP address of the client that sent these requests, serialized in
+ // network-byte-order. It may either be an IPv4 or IPv6 address.
+ bytes client_ip_address = 15;
+
+ // The number of calls started (since the last report) with the given tag and
+ // user_id.
+ int64 num_calls_started = 4;
+
+ // Indicates whether this load report is an in-progress load report in which
+ // num_calls_in_progress is the only valid entry. If in_progress_report is not
+ // set, num_calls_in_progress will be ignored. If in_progress_report is set,
+ // fields other than num_calls_in_progress and orphaned_load will be ignored.
+ oneof in_progress_report {
+ // The number of calls in progress (instantaneously) per load balancer id.
+ int64 num_calls_in_progress = 5;
+ }
+
+ // The following values are counts or totals of call statistics that finished
+ // with the given tag and user_id.
+ int64 num_calls_finished_without_error = 6; // Calls with status OK.
+ int64 num_calls_finished_with_error = 7; // Calls with status non-OK.
+ // Calls that finished with a status that maps to HTTP 5XX (see
+ // googleapis/google/rpc/code.proto). Note that this is a subset of
+ // num_calls_finished_with_error.
+ int64 num_calls_finished_with_server_error = 16;
+
+ // Totals are from calls that with _and_ without error.
+ int64 total_bytes_sent = 8;
+ int64 total_bytes_received = 9;
+ google.protobuf.Duration total_latency = 10;
+
+ // Optional metrics reported for the call(s). Requires that metric_name is
+ // unique.
+ repeated CallMetricData metric_data = 11;
+
+ // The following two fields are used for reporting orphaned load: load that
+ // could not be reported to the originating balancer either since the balancer
+ // is no longer connected or because the frontend sent an invalid token. These
+ // fields must not be set with normal (unorphaned) load reports.
+ oneof orphaned_load {
+ // Load_key is the load_key from the initial_request from the originating
+ // balancer.
+ bytes load_key = 12 [deprecated=true];
+
+ // If true then this load report is for calls that had an invalid token; the
+ // user is probably abusing the gRPC protocol.
+ // TODO(yankaiz): Rename load_key_unknown.
+ bool load_key_unknown = 13;
+
+ // load_key and balancer_id are included in order to identify orphaned load
+ // from different origins.
+ OrphanedLoadIdentifier orphaned_load_identifier = 14;
+ }
+
+ reserved 2;
+}
+
+message CallMetricData {
+ // Name of the metric; may be empty.
+ string metric_name = 1;
+
+ // Number of calls that finished and included this metric.
+ int64 num_calls_finished_with_metric = 2;
+
+ // Sum of metric values across all calls that finished with this metric.
+ double total_metric_value = 3;
+}
+
+message OrphanedLoadIdentifier {
+ // The load_key from the initial_request from the originating balancer.
+ bytes load_key = 1;
+
+ // The unique ID generated by LoadReporter to identify balancers. Here it
+ // distinguishes orphaned load with a same load_key.
+ string load_balancer_id = 2;
+}