aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-07-13 19:52:59 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-07-13 19:52:59 -0700
commitbe40b0d3a8cf2e37c80b2c248111051fa8bdf7bc (patch)
tree79384a0045d5bbc550666f047d088f9582755086 /src
parentf9f5c67aff91e4ad26371b0a2482a5011ab45226 (diff)
Add server load reporting service
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/load_reporting/registered_opencensus_objects.h65
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc54
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h9
-rw-r--r--src/cpp/server/load_reporter/constants.h10
-rw-r--r--src/cpp/server/load_reporter/load_data_store.h3
-rw-r--r--src/cpp/server/load_reporter/load_reporter.cc79
-rw-r--r--src/cpp/server/load_reporter/load_reporter.h5
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.cc370
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.h194
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc41
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc60
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h62
-rw-r--r--src/cpp/server/load_reporter/util.cc45
-rw-r--r--src/proto/grpc/lb/v1/load_reporter.proto4
14 files changed, 920 insertions, 81 deletions
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
index b62863dc2c..4eacda7c02 100644
--- a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
+++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
@@ -28,75 +28,84 @@
namespace grpc {
namespace load_reporter {
+// Note that the functions here are specified as inline to share the static
+// objects across all the translation units including this header. See more
+// details on https://en.cppreference.com/w/cpp/language/inline.
+
// Measures.
-::opencensus::stats::MeasureInt64 MeasureStartCount() {
- static const ::opencensus::stats::MeasureInt64 start_count =
+inline ::opencensus::stats::MeasureInt64 MeasureStartCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureStartCount, kMeasureStartCount, kMeasureStartCount);
- return start_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndCount() {
- static const ::opencensus::stats::MeasureInt64 end_count =
+inline ::opencensus::stats::MeasureInt64 MeasureEndCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndCount, kMeasureEndCount, kMeasureEndCount);
- return end_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_sent =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent);
- return end_bytes_sent;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_received =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived,
kMeasureEndBytesReceived,
kMeasureEndBytesReceived);
- return end_bytes_received;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
- static const ::opencensus::stats::MeasureInt64 end_latency_ms =
+inline ::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs);
- return end_latency_ms;
+ return measure;
}
-::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
- static const ::opencensus::stats::MeasureDouble other_call_metric =
+inline ::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
+ static const ::opencensus::stats::MeasureDouble measure =
::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric,
kMeasureOtherCallMetric,
kMeasureOtherCallMetric);
- return other_call_metric;
+ return measure;
}
// Tags.
-opencensus::stats::TagKey TagKeyToken() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken);
+inline ::opencensus::stats::TagKey TagKeyToken() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyToken);
return token;
}
-opencensus::stats::TagKey TagKeyHost() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost);
+inline ::opencensus::stats::TagKey TagKeyHost() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyHost);
return token;
}
-opencensus::stats::TagKey TagKeyUserId() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId);
+
+inline ::opencensus::stats::TagKey TagKeyUserId() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyUserId);
return token;
}
-opencensus::stats::TagKey TagKeyStatus() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus);
+inline ::opencensus::stats::TagKey TagKeyStatus() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyStatus);
return token;
}
-opencensus::stats::TagKey TagKeyMetricName() {
- static const auto token =
+inline ::opencensus::stats::TagKey TagKeyMetricName() {
+ static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyMetricName);
return token;
}
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index 51e4d795d7..6529046a5e 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc_security.h>
-#include <grpc/load_reporting.h>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -31,18 +30,20 @@
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
-#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils.h"
-#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/transport/static_metadata.h"
namespace grpc {
+constexpr char kEncodedIpv4AddressLengthString[] = "08";
+constexpr char kEncodedIpv6AddressLengthString[] = "32";
+constexpr char kEmptyAddressLengthString[] = "00";
+constexpr size_t kLengthPrefixSize = 2;
+
grpc_error* ServerLoadReportingChannelData::Init(
grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
@@ -90,7 +91,9 @@ void ServerLoadReportingCallData::Destroy(
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
+ gpr_free(client_ip_and_lr_token_);
}
+ gpr_free(target_host_);
grpc_slice_unref_internal(service_method_);
}
@@ -100,7 +103,8 @@ void ServerLoadReportingCallData::StartTransportStreamOpBatch(
if (op->recv_initial_metadata() != nullptr) {
// Save some fields to use when initial metadata is ready.
peer_string_ = op->get_peer_string();
- recv_initial_metadata_ = op->recv_initial_metadata();
+ recv_initial_metadata_ =
+ op->op()->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
// Substitute the original closure for the wrapper closure.
op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
@@ -157,10 +161,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString(
*size = 8;
} else if (addr->sa_family == GRPC_AF_INET6) {
grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
- *client_ip_string = static_cast<char*>(gpr_malloc(32));
+ *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
for (size_t i = 0; i < 16; ++i) {
- sprintf(*client_ip_string + i, "%02x",
- addr6->sin6_addr.__in6_u.__u6_addr8[i]);
+ snprintf(*client_ip_string + i * 2, 2 + 1, "%02x",
+ addr6->sin6_addr.__in6_u.__u6_addr8[i]);
}
*size = 32;
} else {
@@ -241,7 +245,7 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
if (err == GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR(
"server_load_reporting_filter",
- grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
+ grpc_metadata_batch_filter(calld->recv_initial_metadata_,
RecvInitialMetadataFilter, elem,
"recv_initial_metadata filtering error"));
// If the LB token was not found in the recv_initial_metadata, only the
@@ -277,7 +281,6 @@ grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
- // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
const grpc_slice value = GRPC_MDVALUE(md);
const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
@@ -325,4 +328,35 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus(
}
}
+namespace {
+bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
+}
+} // namespace
+
+// TODO(juanlishen): We should register the filter during grpc initialization
+// time once OpenCensus is compatible with our build system. For now, we force
+// registration of the server load reporting filter at static initialization
+// time if we build with the filter target.
+struct ServerLoadReportingFilterStaticRegistrar {
+ ServerLoadReportingFilterStaticRegistrar() {
+ static std::atomic_bool registered{false};
+ if (registered) return;
+ RegisterChannelFilter<ServerLoadReportingChannelData,
+ ServerLoadReportingCallData>(
+ "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
+ MaybeAddServerLoadReportingFilter);
+ // Access measures to ensure they are initialized. Otherwise, we can't
+ // create any valid view before the first RPC.
+ ::grpc::load_reporter::MeasureStartCount();
+ ::grpc::load_reporter::MeasureEndCount();
+ ::grpc::load_reporter::MeasureEndBytesSent();
+ ::grpc::load_reporter::MeasureEndBytesReceived();
+ ::grpc::load_reporter::MeasureEndLatencyMs();
+ ::grpc::load_reporter::MeasureOtherCallMetric();
+ registered = true;
+ }
+} server_load_reporting_filter_static_registrar;
+
} // namespace grpc
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index 029b19ac89..10baf1f833 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -86,8 +86,8 @@ class ServerLoadReportingCallData : public CallData {
// The received initial metadata (a member of the recv_initial_metadata op).
// When it is ready, we will extract some data from it via
// recv_initial_metadata_ready_ closure, before the original
- // recv_initial_metadata_ready closure,
- MetadataBatch* recv_initial_metadata_;
+ // recv_initial_metadata_ready closure.
+ grpc_metadata_batch* recv_initial_metadata_;
// The original recv_initial_metadata closure, which is wrapped by our own
// closure (recv_initial_metadata_ready_) to capture the incoming initial
@@ -112,11 +112,6 @@ class ServerLoadReportingCallData : public CallData {
// token.
char* client_ip_and_lr_token_;
size_t client_ip_and_lr_token_len_;
-
- static constexpr char kEncodedIpv4AddressLengthString[] = "08";
- static constexpr char kEncodedIpv6AddressLengthString[] = "32";
- static constexpr char kEmptyAddressLengthString[] = "00";
- static constexpr size_t kLengthPrefixSize = 2;
};
} // namespace grpc
diff --git a/src/cpp/server/load_reporter/constants.h b/src/cpp/server/load_reporter/constants.h
index 07c5965fff..00ad794a04 100644
--- a/src/cpp/server/load_reporter/constants.h
+++ b/src/cpp/server/load_reporter/constants.h
@@ -24,6 +24,16 @@
namespace grpc {
namespace load_reporter {
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
constexpr size_t kLbIdLength = 8;
constexpr size_t kIpv4AddressLength = 8;
constexpr size_t kIpv6AddressLength = 32;
diff --git a/src/cpp/server/load_reporter/load_data_store.h b/src/cpp/server/load_reporter/load_data_store.h
index 2da78ea064..dc08ecf479 100644
--- a/src/cpp/server/load_reporter/load_data_store.h
+++ b/src/cpp/server/load_reporter/load_data_store.h
@@ -160,7 +160,8 @@ class LoadRecordValue {
", error_count_=" + grpc::to_string(error_count_) +
", bytes_sent_=" + grpc::to_string(bytes_sent_) +
", bytes_recv_=" + grpc::to_string(bytes_recv_) +
- ", latency_ms_=" + grpc::to_string(latency_ms_) + "]";
+ ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
+ grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
}
bool InsertCallMetric(const grpc::string& metric_name,
diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc
index 3f0063d883..464063a13f 100644
--- a/src/cpp/server/load_reporter/load_reporter.cc
+++ b/src/cpp/server/load_reporter/load_reporter.cc
@@ -22,6 +22,7 @@
#include <stdio.h>
#include <chrono>
#include <ctime>
+#include <iterator>
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
@@ -65,8 +66,8 @@ CensusViewProvider::CensusViewProvider()
// measurements instead of setting the data values directly.
auto vd_end_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndCount))
- .set_measure((kMeasureEndCount))
+ .set_name(kViewEndCount)
+ .set_measure(kMeasureEndCount)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -80,8 +81,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
auto vd_end_bytes_sent =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesSent))
- .set_measure((kMeasureEndBytesSent))
+ .set_name(kViewEndBytesSent)
+ .set_measure(kMeasureEndBytesSent)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -95,8 +96,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
auto vd_end_bytes_received =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesReceived))
- .set_measure((kMeasureEndBytesReceived))
+ .set_name(kViewEndBytesReceived)
+ .set_measure(kMeasureEndBytesReceived)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -110,8 +111,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
auto vd_end_latency_ms =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndLatencyMs))
- .set_measure((kMeasureEndLatencyMs))
+ .set_name(kViewEndLatencyMs)
+ .set_measure(kMeasureEndLatencyMs)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -126,8 +127,8 @@ CensusViewProvider::CensusViewProvider()
// Two views related to other call metrics.
auto vd_metric_call_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricCount))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricCount)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Count())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -141,8 +142,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
auto vd_metric_value =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricValue))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricValue)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -161,11 +162,26 @@ double CensusViewProvider::GetRelatedViewDataRowDouble(
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kDouble);
auto it_row = it_vd->second.double_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.double_data().end());
return it_row->second;
}
+uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kInt64);
+ auto it_row = it_vd->second.int_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.int_data().end());
+ GPR_ASSERT(it_row->second >= 0);
+ return it_row->second;
+}
+
CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
for (const auto& p : view_descriptor_map()) {
const grpc::string& view_name = p.first;
@@ -235,23 +251,23 @@ LoadReporter::GenerateLoadBalancingFeedback() {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
// Find the longest range with valid ends.
- LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
- LoadBalancingFeedbackRecord* newest =
- &feedback_records_[feedback_records_.size() - 1];
- while (newest > oldest &&
+ auto oldest = feedback_records_.begin();
+ auto newest = feedback_records_.end() - 1;
+ while (std::distance(oldest, newest) > 0 &&
(newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
// A zero limit means that the system info reading was failed, so these
// records can't be used to calculate CPU utilization.
if (newest->cpu_limit == 0) --newest;
if (oldest->cpu_limit == 0) ++oldest;
}
- if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
+ if (std::distance(oldest, newest) < 1 ||
+ oldest->end_time == newest->end_time ||
newest->cpu_limit == oldest->cpu_limit) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
uint64_t rpcs = 0;
uint64_t errors = 0;
- for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
+ for (auto p = newest; p != oldest; --p) {
// Because these two numbers are counters, the oldest record shouldn't be
// included.
rpcs += p->rpcs;
@@ -338,7 +354,8 @@ void LoadReporter::AttachOrphanLoadId(
if (per_balancer_store.lb_id() == kInvalidLbId) {
load->set_load_key_unknown(true);
} else {
- load->set_load_key_unknown(false);
+ // We shouldn't set load_key_unknown to any value in this case because
+ // load_key_unknown and orphaned_load_identifier are under an oneof struct.
load->mutable_orphaned_load_identifier()->set_load_key(
per_balancer_store.load_key());
load->mutable_orphaned_load_identifier()->set_load_balancer_id(
@@ -381,9 +398,7 @@ void LoadReporter::ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewStartCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t start_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -405,9 +420,7 @@ void LoadReporter::ProcessViewDataCallEnd(
uint64_t total_error_count = 0;
auto it = view_data_map.find(kViewEndCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t end_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -424,18 +437,16 @@ void LoadReporter::ProcessViewDataCallEnd(
continue;
}
LoadRecordKey key(client_ip_and_token, user_id);
- const uint64_t bytes_sent =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
- tag_values);
+ const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
+ tag_values);
const uint64_t bytes_received =
- CensusViewProvider::GetRelatedViewDataRowDouble(
+ CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndBytesReceived,
sizeof(kViewEndBytesReceived) - 1, tag_values);
- const uint64_t latency_ms =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
- tag_values);
+ const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
+ tag_values);
uint64_t ok_count = 0;
uint64_t error_count = 0;
total_end_count += end_count;
diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h
index 49a2e4b53c..b2254d5601 100644
--- a/src/cpp/server/load_reporter/load_reporter.h
+++ b/src/cpp/server/load_reporter/load_reporter.h
@@ -59,7 +59,10 @@ class CensusViewProvider {
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
- double static GetRelatedViewDataRowDouble(
+ static double GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+ static uint64_t GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
new file mode 100644
index 0000000000..d001199b8c
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
@@ -0,0 +1,370 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+}
+
+LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : cq_(std::move(cq)) {
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("server_load_reporting", Work, this));
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
+#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
+ cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
+#endif
+ load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
+ kFeedbackSampleWindowSeconds,
+ std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
+ std::move(cpu_stats_provider)));
+}
+
+LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ cq_->Shutdown();
+ }
+ if (next_fetch_and_sample_alarm_ != nullptr)
+ next_fetch_and_sample_alarm_->Cancel();
+ thread_->Join();
+}
+
+void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
+ auto next_fetch_and_sample_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
+ GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ if (shutdown_) return;
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_fetch_and_sample_alarm_.reset(new Alarm);
+ next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
+ this);
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
+}
+
+void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
+ if (!ok) {
+ gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
+ return;
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
+ load_reporter_->FetchAndSample();
+ ScheduleNextFetchAndSample();
+}
+
+void LoadReporterAsyncServiceImpl::Work(void* arg) {
+ LoadReporterAsyncServiceImpl* service =
+ reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
+ service->FetchAndSample(true /* ok */);
+ // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
+ // to figure out why cq is not ready after service starts.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
+ service->load_reporter_.get());
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ if (tag == service) {
+ service->FetchAndSample(ok);
+ } else {
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter) {
+ std::shared_ptr<ReportLoadHandler> handler =
+ std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
+ ReportLoadHandler* p = handler.get();
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ p->on_done_notified_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ p->next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(handler));
+ p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
+ service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
+ &p->next_inbound_);
+ }
+}
+
+LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter)
+ : cq_(cq),
+ service_(service),
+ load_reporter_(load_reporter),
+ stream_(&ctx_),
+ call_status_(WAITING_FOR_DELIVERY) {}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ call_status_ = DELIVERED;
+ } else {
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ }
+ if (!ok || shutdown_) {
+ // The value of ok being false means that the server is shutting down.
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, service_, load_reporter_);
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ // LB ID is unique for each load reporting stream.
+ lb_id_ = load_reporter_->GenerateLbId();
+ gpr_log(GPR_INFO,
+ "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
+ "Start reading the initial request...",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
+ // The client may have half-closed the stream or the stream is broken.
+ gpr_log(GPR_INFO,
+ "[LRS %p] Failed reading the initial request from the stream "
+ "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
+ service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
+ static_cast<int>(is_cancelled_));
+ }
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ // We only receive one request, which is the initial request.
+ if (call_status_ < INITIAL_REQUEST_RECEIVED) {
+ if (!request_.has_initial_request()) {
+ Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
+ } else {
+ call_status_ = INITIAL_REQUEST_RECEIVED;
+ const auto& initial_request = request_.initial_request();
+ load_balanced_hostname_ = initial_request.load_balanced_hostname();
+ load_key_ = initial_request.load_key();
+ load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
+ load_key_);
+ const auto& load_report_interval = initial_request.load_report_interval();
+ load_report_interval_ms_ =
+ static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
+ load_report_interval.nanos() / 1000);
+ gpr_log(
+ GPR_INFO,
+ "[LRS %p] Initial request received. Start load reporting (load "
+ "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
+ service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
+ lb_id_.c_str(), this);
+ SendReport(self, true /* ok */);
+ // Expect this read to fail.
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ }
+ } else {
+ // Another request received! This violates the spec.
+ gpr_log(GPR_ERROR,
+ "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ Shutdown(std::move(self), "OnReadDone+second_request");
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ auto next_report_time = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_report_alarm_.reset(new Alarm);
+ next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
+ }
+ gpr_log(GPR_DEBUG,
+ "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ ::grpc::lb::v1::LoadReportResponse response;
+ auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
+ response.mutable_load()->Swap(&loads);
+ auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
+ response.mutable_load_balancing_feedback()->Swap(&feedback);
+ if (call_status_ < INITIAL_RESPONSE_SENT) {
+ auto initial_response = response.mutable_initial_response();
+ initial_response->set_load_balancer_id(lb_id_);
+ initial_response->set_implementation_id(
+ ::grpc::lb::v1::InitialLoadReportResponse::CPP);
+ initial_response->set_server_version(kVersion);
+ call_status_ = INITIAL_RESPONSE_SENT;
+ }
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_outbound_);
+ gpr_log(GPR_INFO,
+ "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
+ "count: %d)...",
+ service_, lb_id_.c_str(), this, response.load().size());
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ done_notified_ = true;
+ if (ctx_.IsCancelled()) {
+ is_cancelled_ = true;
+ }
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(is_cancelled_));
+ Shutdown(std::move(self), "OnDoneNotified");
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
+ std::shared_ptr<ReportLoadHandler> self, const char* reason) {
+ if (!shutdown_) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
+ "reason: %s).",
+ service_, lb_id_.c_str(), this, reason);
+ shutdown_ = true;
+ if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
+ load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
+ next_report_alarm_->Cancel();
+ }
+ }
+ // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
+ // try to Finish() every time we are in Shutdown().
+ if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (!service_->shutdown_) {
+ on_finish_done_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Maybe add a message proto for the client to
+ // explicitly cancel the stream so that we can return OK status in such
+ // cases.
+ stream_.Finish(Status::CANCELLED, &on_finish_done_);
+ call_status_ = FINISH_CALLED;
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ }
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
new file mode 100644
index 0000000000..6fc577ff49
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
@@ -0,0 +1,194 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
+#include <grpcpp/grpcpp.h>
+
+#include "src/core/lib/gprpp/thd.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Async load reporting service. It's mainly responsible for controlling the
+// procedure of incoming requests. The real business logic is handed off to the
+// LoadReporter. There should be at most one instance of this service on a
+// server to avoid spreading the load data into multiple places.
+class LoadReporterAsyncServiceImpl
+ : public grpc::lb::v1::LoadReporter::AsyncService {
+ public:
+ explicit LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq);
+ ~LoadReporterAsyncServiceImpl();
+
+ // Starts the working thread.
+ void StartThread();
+
+ // Not copyable nor movable.
+ LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
+ LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
+ delete;
+
+ private:
+ class ReportLoadHandler;
+
+ // A tag that can be called with a bool argument. It's tailored for
+ // ReportLoadHandler's use. Before being used, it should be constructed with a
+ // method of ReportLoadHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function can
+ // only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func,
+ std::shared_ptr<ReportLoadHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no longer
+ // owned by this tag after this method is invoked.
+ void Run(bool ok);
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<ReportLoadHandler> handler_;
+ };
+
+ // Each handler takes care of one load reporting stream. It contains
+ // per-stream data and it will access the members of the parent class (i.e.,
+ // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
+ class ReportLoadHandler {
+ public:
+ // Instantiates a ReportLoadHandler and requests the next load reporting
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ ReportLoadHandler(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ private:
+ // After the handler has a call request delivered, it starts reading the
+ // initial request. Also, a new handler is spawned so that we can keep
+ // servicing future calls.
+ void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The first Read() is expected to succeed, after which the handler starts
+ // sending load reports back to the balancer. The second Read() is
+ // expected to fail, which happens when the balancer half-closes the
+ // stream to signal that it's no longer interested in the load reports. For
+ // the latter case, the handler will then close the stream.
+ void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The report sending operations are sequential as: send report -> send
+ // done, schedule the next send -> waiting for the alarm to fire -> alarm
+ // fires, send report -> ...
+ void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+ void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
+
+ // The key fields of the stream.
+ grpc::string lb_id_;
+ grpc::string load_balanced_hostname_;
+ grpc::string load_key_;
+ uint64_t load_report_interval_ms_;
+
+ // The data for RPC communication with the load reportee.
+ ServerContext ctx_;
+ ::grpc::lb::v1::LoadReportRequest request_;
+
+ // The members passed down from LoadReporterAsyncServiceImpl.
+ ServerCompletionQueue* cq_;
+ LoadReporterAsyncServiceImpl* service_;
+ LoadReporter* load_reporter_;
+ ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
+ ::grpc::lb::v1::LoadReportRequest>
+ stream_;
+
+ // The status of the RPC progress.
+ enum CallStatus {
+ WAITING_FOR_DELIVERY,
+ DELIVERED,
+ INITIAL_REQUEST_RECEIVED,
+ INITIAL_RESPONSE_SENT,
+ FINISH_CALLED
+ } call_status_;
+ bool shutdown_{false};
+ bool done_notified_{false};
+ bool is_cancelled_{false};
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ CallableTag next_inbound_;
+ CallableTag next_outbound_;
+ std::unique_ptr<Alarm> next_report_alarm_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Work(void* arg);
+
+ // Schedules the next data fetching from Census and LB feedback sampling.
+ void ScheduleNextFetchAndSample();
+
+ // Fetches data from Census and samples LB feedback.
+ void FetchAndSample(bool ok);
+
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ // To synchronize the operations related to shutdown state of cq_, so that we
+ // don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
+ std::unique_ptr<LoadReporter> load_reporter_;
+ std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
new file mode 100644
index 0000000000..81cf6ac562
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void LoadReportingServiceServerBuilderOption::UpdateArguments(
+ ::grpc::ChannelArguments* args) {
+ args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true);
+}
+
+void LoadReportingServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) {
+ plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin());
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
new file mode 100644
index 0000000000..c2c5dd5ed5
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+#include <grpcpp/impl/server_initializer.h>
+
+namespace grpc {
+namespace load_reporter {
+
+bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_synchronous_methods();
+ }
+ return false;
+}
+
+bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_async_methods();
+ }
+ return false;
+}
+
+void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder(
+ grpc::ServerBuilder* builder) {
+ auto cq = builder->AddCompletionQueue();
+ service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq));
+}
+
+void LoadReportingServiceServerBuilderPlugin::InitServer(
+ grpc::ServerInitializer* si) {
+ si->RegisterService(service_);
+}
+
+void LoadReportingServiceServerBuilderPlugin::Finish(
+ grpc::ServerInitializer* si) {
+ service_->StartThread();
+ service_.reset();
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
new file mode 100644
index 0000000000..1f098591d4
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/impl/server_builder_plugin.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The plugin that registers and starts load reporting service when starting a
+// server.
+class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin {
+ public:
+ ~LoadReportingServiceServerBuilderPlugin() override = default;
+ grpc::string name() override { return "load_reporting_service"; }
+
+ // Creates a load reporting service.
+ void UpdateServerBuilder(grpc::ServerBuilder* builder) override;
+
+ // Registers the load reporter service.
+ void InitServer(grpc::ServerInitializer* si) override;
+
+ // Starts the load reporter service.
+ void Finish(grpc::ServerInitializer* si) override;
+
+ void ChangeArguments(const grpc::string& name, void* value) override {}
+ void UpdateChannelArguments(grpc::ChannelArguments* args) override {}
+ bool has_sync_methods() const override;
+ bool has_async_methods() const override;
+
+ private:
+ std::shared_ptr<LoadReporterAsyncServiceImpl> service_;
+};
+
+std::unique_ptr<grpc::ServerBuilderPlugin>
+CreateLoadReportingServiceServerBuilderPlugin();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
diff --git a/src/cpp/server/load_reporter/util.cc b/src/cpp/server/load_reporter/util.cc
new file mode 100644
index 0000000000..a2f2f11e70
--- /dev/null
+++ b/src/cpp/server/load_reporter/util.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <grpc/support/log.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value) {
+ if (std::isnormal(cost_value)) {
+ grpc::string buf;
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/proto/grpc/lb/v1/load_reporter.proto b/src/proto/grpc/lb/v1/load_reporter.proto
index c2e4f23f6e..d57a37fed7 100644
--- a/src/proto/grpc/lb/v1/load_reporter.proto
+++ b/src/proto/grpc/lb/v1/load_reporter.proto
@@ -114,6 +114,10 @@ message Load {
// num_calls_in_progress is the only valid entry. If in_progress_report is not
// set, num_calls_in_progress will be ignored. If in_progress_report is set,
// fields other than num_calls_in_progress and orphaned_load will be ignored.
+ // TODO(juanlishen): A Load is either an in_progress_report or not. We should
+ // make this explicit in hierarchy. From the log, I see in_progress_report_
+ // has a random num_calls_in_progress_ when not set, which might lead to bug
+ // when the balancer process the load report.
oneof in_progress_report {
// The number of calls in progress (instantaneously) per load balancer id.
int64 num_calls_in_progress = 5;