aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-07-13 19:52:59 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-07-13 19:52:59 -0700
commitbe40b0d3a8cf2e37c80b2c248111051fa8bdf7bc (patch)
tree79384a0045d5bbc550666f047d088f9582755086
parentf9f5c67aff91e4ad26371b0a2482a5011ab45226 (diff)
Add server load reporting service
-rw-r--r--BUILD55
-rw-r--r--include/grpcpp/ext/server_load_reporting.h53
-rw-r--r--include/grpcpp/impl/codegen/server_context.h2
-rw-r--r--src/core/ext/filters/load_reporting/registered_opencensus_objects.h65
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc54
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h9
-rw-r--r--src/cpp/server/load_reporter/constants.h10
-rw-r--r--src/cpp/server/load_reporter/load_data_store.h3
-rw-r--r--src/cpp/server/load_reporter/load_reporter.cc79
-rw-r--r--src/cpp/server/load_reporter/load_reporter.h5
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.cc370
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.h194
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc41
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc60
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h62
-rw-r--r--src/cpp/server/load_reporter/util.cc45
-rw-r--r--src/proto/grpc/lb/v1/load_reporter.proto4
-rw-r--r--test/cpp/end2end/BUILD16
-rw-r--r--test/cpp/end2end/server_load_reporting_end2end_test.cc191
-rw-r--r--test/cpp/server/load_reporter/BUILD1
-rw-r--r--test/cpp/server/load_reporter/load_reporter_test.cc9
21 files changed, 1240 insertions, 88 deletions
diff --git a/BUILD b/BUILD
index c7d64b4a6c..8523bbb660 100644
--- a/BUILD
+++ b/BUILD
@@ -98,10 +98,10 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/grpc.h",
"include/grpc/grpc_posix.h",
"include/grpc/grpc_security_constants.h",
- "include/grpc/load_reporting.h",
"include/grpc/slice.h",
"include/grpc/slice_buffer.h",
"include/grpc/status.h",
+ "include/grpc/load_reporting.h",
"include/grpc/support/workaround_list.h",
]
@@ -1201,9 +1201,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@@ -1211,9 +1211,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@@ -1234,9 +1234,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@@ -1244,9 +1244,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@@ -1334,6 +1334,51 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "lb_server_load_reporting_service_server_builder_plugin",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc",
+ ],
+ hdrs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h",
+ ],
+ language = "c++",
+ deps = [
+ "lb_load_reporter_service",
+ ],
+)
+
+grpc_cc_library(
+ name = "grpcpp_server_load_reporting",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc",
+ "src/cpp/server/load_reporter/util.cc",
+ ],
+ language = "c++",
+ public_hdrs = [
+ "include/grpcpp/ext/server_load_reporting.h",
+ ],
+ deps = [
+ "lb_server_load_reporting_filter",
+ "lb_server_load_reporting_service_server_builder_plugin",
+ ],
+ alwayslink = 1,
+)
+
+grpc_cc_library(
+ name = "lb_load_reporter_service",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporter_async_service_impl.cc",
+ ],
+ hdrs = [
+ "src/cpp/server/load_reporter/load_reporter_async_service_impl.h",
+ ],
+ language = "c++",
+ deps = [
+ "lb_load_reporter",
+ ],
+)
+
+grpc_cc_library(
name = "lb_get_cpu_stats",
srcs = [
"src/cpp/server/load_reporter/get_cpu_stats_linux.cc",
diff --git a/include/grpcpp/ext/server_load_reporting.h b/include/grpcpp/ext/server_load_reporting.h
new file mode 100644
index 0000000000..939569c19a
--- /dev/null
+++ b/include/grpcpp/ext/server_load_reporting.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_EXT_SERVER_LOAD_REPORTING_H
+#define GRPCPP_EXT_SERVER_LOAD_REPORTING_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/load_reporting.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/server_builder_option.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+// The ServerBuilderOption to enable server-side load reporting feature. To
+// enable the feature, please make sure the binary builds with the
+// grpcpp_server_load_reporting library and set this option in the
+// ServerBuilder.
+class LoadReportingServiceServerBuilderOption : public ServerBuilderOption {
+ public:
+ void UpdateArguments(::grpc::ChannelArguments* args) override;
+ void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
+ plugins) override;
+};
+
+// Adds the load reporting cost with \a cost_name and \a cost_value in the
+// trailing metadata of the server context.
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value);
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPCPP_EXT_SERVER_LOAD_REPORTING_H
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index bced4202dd..153b404d9e 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -201,7 +201,7 @@ class ServerContext {
/// \param algorithm The compression algorithm used for the server call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
- /// Set the load reporting costs in \a cost_data for the call.
+ /// Set the serialized load reporting costs in \a cost_data for the call.
void SetLoadReportingCosts(const std::vector<grpc::string>& cost_data);
/// Return the authentication context for this server call.
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
index b62863dc2c..4eacda7c02 100644
--- a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
+++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
@@ -28,75 +28,84 @@
namespace grpc {
namespace load_reporter {
+// Note that the functions here are specified as inline to share the static
+// objects across all the translation units including this header. See more
+// details on https://en.cppreference.com/w/cpp/language/inline.
+
// Measures.
-::opencensus::stats::MeasureInt64 MeasureStartCount() {
- static const ::opencensus::stats::MeasureInt64 start_count =
+inline ::opencensus::stats::MeasureInt64 MeasureStartCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureStartCount, kMeasureStartCount, kMeasureStartCount);
- return start_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndCount() {
- static const ::opencensus::stats::MeasureInt64 end_count =
+inline ::opencensus::stats::MeasureInt64 MeasureEndCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndCount, kMeasureEndCount, kMeasureEndCount);
- return end_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_sent =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent);
- return end_bytes_sent;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_received =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived,
kMeasureEndBytesReceived,
kMeasureEndBytesReceived);
- return end_bytes_received;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
- static const ::opencensus::stats::MeasureInt64 end_latency_ms =
+inline ::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs);
- return end_latency_ms;
+ return measure;
}
-::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
- static const ::opencensus::stats::MeasureDouble other_call_metric =
+inline ::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
+ static const ::opencensus::stats::MeasureDouble measure =
::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric,
kMeasureOtherCallMetric,
kMeasureOtherCallMetric);
- return other_call_metric;
+ return measure;
}
// Tags.
-opencensus::stats::TagKey TagKeyToken() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken);
+inline ::opencensus::stats::TagKey TagKeyToken() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyToken);
return token;
}
-opencensus::stats::TagKey TagKeyHost() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost);
+inline ::opencensus::stats::TagKey TagKeyHost() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyHost);
return token;
}
-opencensus::stats::TagKey TagKeyUserId() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId);
+
+inline ::opencensus::stats::TagKey TagKeyUserId() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyUserId);
return token;
}
-opencensus::stats::TagKey TagKeyStatus() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus);
+inline ::opencensus::stats::TagKey TagKeyStatus() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyStatus);
return token;
}
-opencensus::stats::TagKey TagKeyMetricName() {
- static const auto token =
+inline ::opencensus::stats::TagKey TagKeyMetricName() {
+ static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyMetricName);
return token;
}
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index 51e4d795d7..6529046a5e 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc_security.h>
-#include <grpc/load_reporting.h>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -31,18 +30,20 @@
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
-#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils.h"
-#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/transport/static_metadata.h"
namespace grpc {
+constexpr char kEncodedIpv4AddressLengthString[] = "08";
+constexpr char kEncodedIpv6AddressLengthString[] = "32";
+constexpr char kEmptyAddressLengthString[] = "00";
+constexpr size_t kLengthPrefixSize = 2;
+
grpc_error* ServerLoadReportingChannelData::Init(
grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
@@ -90,7 +91,9 @@ void ServerLoadReportingCallData::Destroy(
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
+ gpr_free(client_ip_and_lr_token_);
}
+ gpr_free(target_host_);
grpc_slice_unref_internal(service_method_);
}
@@ -100,7 +103,8 @@ void ServerLoadReportingCallData::StartTransportStreamOpBatch(
if (op->recv_initial_metadata() != nullptr) {
// Save some fields to use when initial metadata is ready.
peer_string_ = op->get_peer_string();
- recv_initial_metadata_ = op->recv_initial_metadata();
+ recv_initial_metadata_ =
+ op->op()->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
// Substitute the original closure for the wrapper closure.
op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
@@ -157,10 +161,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString(
*size = 8;
} else if (addr->sa_family == GRPC_AF_INET6) {
grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
- *client_ip_string = static_cast<char*>(gpr_malloc(32));
+ *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
for (size_t i = 0; i < 16; ++i) {
- sprintf(*client_ip_string + i, "%02x",
- addr6->sin6_addr.__in6_u.__u6_addr8[i]);
+ snprintf(*client_ip_string + i * 2, 2 + 1, "%02x",
+ addr6->sin6_addr.__in6_u.__u6_addr8[i]);
}
*size = 32;
} else {
@@ -241,7 +245,7 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
if (err == GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR(
"server_load_reporting_filter",
- grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
+ grpc_metadata_batch_filter(calld->recv_initial_metadata_,
RecvInitialMetadataFilter, elem,
"recv_initial_metadata filtering error"));
// If the LB token was not found in the recv_initial_metadata, only the
@@ -277,7 +281,6 @@ grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
- // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
const grpc_slice value = GRPC_MDVALUE(md);
const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
@@ -325,4 +328,35 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus(
}
}
+namespace {
+bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
+}
+} // namespace
+
+// TODO(juanlishen): We should register the filter during grpc initialization
+// time once OpenCensus is compatible with our build system. For now, we force
+// registration of the server load reporting filter at static initialization
+// time if we build with the filter target.
+struct ServerLoadReportingFilterStaticRegistrar {
+ ServerLoadReportingFilterStaticRegistrar() {
+ static std::atomic_bool registered{false};
+ if (registered) return;
+ RegisterChannelFilter<ServerLoadReportingChannelData,
+ ServerLoadReportingCallData>(
+ "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
+ MaybeAddServerLoadReportingFilter);
+ // Access measures to ensure they are initialized. Otherwise, we can't
+ // create any valid view before the first RPC.
+ ::grpc::load_reporter::MeasureStartCount();
+ ::grpc::load_reporter::MeasureEndCount();
+ ::grpc::load_reporter::MeasureEndBytesSent();
+ ::grpc::load_reporter::MeasureEndBytesReceived();
+ ::grpc::load_reporter::MeasureEndLatencyMs();
+ ::grpc::load_reporter::MeasureOtherCallMetric();
+ registered = true;
+ }
+} server_load_reporting_filter_static_registrar;
+
} // namespace grpc
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index 029b19ac89..10baf1f833 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -86,8 +86,8 @@ class ServerLoadReportingCallData : public CallData {
// The received initial metadata (a member of the recv_initial_metadata op).
// When it is ready, we will extract some data from it via
// recv_initial_metadata_ready_ closure, before the original
- // recv_initial_metadata_ready closure,
- MetadataBatch* recv_initial_metadata_;
+ // recv_initial_metadata_ready closure.
+ grpc_metadata_batch* recv_initial_metadata_;
// The original recv_initial_metadata closure, which is wrapped by our own
// closure (recv_initial_metadata_ready_) to capture the incoming initial
@@ -112,11 +112,6 @@ class ServerLoadReportingCallData : public CallData {
// token.
char* client_ip_and_lr_token_;
size_t client_ip_and_lr_token_len_;
-
- static constexpr char kEncodedIpv4AddressLengthString[] = "08";
- static constexpr char kEncodedIpv6AddressLengthString[] = "32";
- static constexpr char kEmptyAddressLengthString[] = "00";
- static constexpr size_t kLengthPrefixSize = 2;
};
} // namespace grpc
diff --git a/src/cpp/server/load_reporter/constants.h b/src/cpp/server/load_reporter/constants.h
index 07c5965fff..00ad794a04 100644
--- a/src/cpp/server/load_reporter/constants.h
+++ b/src/cpp/server/load_reporter/constants.h
@@ -24,6 +24,16 @@
namespace grpc {
namespace load_reporter {
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
constexpr size_t kLbIdLength = 8;
constexpr size_t kIpv4AddressLength = 8;
constexpr size_t kIpv6AddressLength = 32;
diff --git a/src/cpp/server/load_reporter/load_data_store.h b/src/cpp/server/load_reporter/load_data_store.h
index 2da78ea064..dc08ecf479 100644
--- a/src/cpp/server/load_reporter/load_data_store.h
+++ b/src/cpp/server/load_reporter/load_data_store.h
@@ -160,7 +160,8 @@ class LoadRecordValue {
", error_count_=" + grpc::to_string(error_count_) +
", bytes_sent_=" + grpc::to_string(bytes_sent_) +
", bytes_recv_=" + grpc::to_string(bytes_recv_) +
- ", latency_ms_=" + grpc::to_string(latency_ms_) + "]";
+ ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
+ grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
}
bool InsertCallMetric(const grpc::string& metric_name,
diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc
index 3f0063d883..464063a13f 100644
--- a/src/cpp/server/load_reporter/load_reporter.cc
+++ b/src/cpp/server/load_reporter/load_reporter.cc
@@ -22,6 +22,7 @@
#include <stdio.h>
#include <chrono>
#include <ctime>
+#include <iterator>
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
@@ -65,8 +66,8 @@ CensusViewProvider::CensusViewProvider()
// measurements instead of setting the data values directly.
auto vd_end_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndCount))
- .set_measure((kMeasureEndCount))
+ .set_name(kViewEndCount)
+ .set_measure(kMeasureEndCount)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -80,8 +81,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
auto vd_end_bytes_sent =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesSent))
- .set_measure((kMeasureEndBytesSent))
+ .set_name(kViewEndBytesSent)
+ .set_measure(kMeasureEndBytesSent)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -95,8 +96,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
auto vd_end_bytes_received =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesReceived))
- .set_measure((kMeasureEndBytesReceived))
+ .set_name(kViewEndBytesReceived)
+ .set_measure(kMeasureEndBytesReceived)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -110,8 +111,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
auto vd_end_latency_ms =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndLatencyMs))
- .set_measure((kMeasureEndLatencyMs))
+ .set_name(kViewEndLatencyMs)
+ .set_measure(kMeasureEndLatencyMs)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -126,8 +127,8 @@ CensusViewProvider::CensusViewProvider()
// Two views related to other call metrics.
auto vd_metric_call_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricCount))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricCount)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Count())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -141,8 +142,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
auto vd_metric_value =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricValue))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricValue)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -161,11 +162,26 @@ double CensusViewProvider::GetRelatedViewDataRowDouble(
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kDouble);
auto it_row = it_vd->second.double_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.double_data().end());
return it_row->second;
}
+uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kInt64);
+ auto it_row = it_vd->second.int_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.int_data().end());
+ GPR_ASSERT(it_row->second >= 0);
+ return it_row->second;
+}
+
CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
for (const auto& p : view_descriptor_map()) {
const grpc::string& view_name = p.first;
@@ -235,23 +251,23 @@ LoadReporter::GenerateLoadBalancingFeedback() {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
// Find the longest range with valid ends.
- LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
- LoadBalancingFeedbackRecord* newest =
- &feedback_records_[feedback_records_.size() - 1];
- while (newest > oldest &&
+ auto oldest = feedback_records_.begin();
+ auto newest = feedback_records_.end() - 1;
+ while (std::distance(oldest, newest) > 0 &&
(newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
// A zero limit means that the system info reading was failed, so these
// records can't be used to calculate CPU utilization.
if (newest->cpu_limit == 0) --newest;
if (oldest->cpu_limit == 0) ++oldest;
}
- if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
+ if (std::distance(oldest, newest) < 1 ||
+ oldest->end_time == newest->end_time ||
newest->cpu_limit == oldest->cpu_limit) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
uint64_t rpcs = 0;
uint64_t errors = 0;
- for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
+ for (auto p = newest; p != oldest; --p) {
// Because these two numbers are counters, the oldest record shouldn't be
// included.
rpcs += p->rpcs;
@@ -338,7 +354,8 @@ void LoadReporter::AttachOrphanLoadId(
if (per_balancer_store.lb_id() == kInvalidLbId) {
load->set_load_key_unknown(true);
} else {
- load->set_load_key_unknown(false);
+ // We shouldn't set load_key_unknown to any value in this case because
+ // load_key_unknown and orphaned_load_identifier are under an oneof struct.
load->mutable_orphaned_load_identifier()->set_load_key(
per_balancer_store.load_key());
load->mutable_orphaned_load_identifier()->set_load_balancer_id(
@@ -381,9 +398,7 @@ void LoadReporter::ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewStartCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t start_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -405,9 +420,7 @@ void LoadReporter::ProcessViewDataCallEnd(
uint64_t total_error_count = 0;
auto it = view_data_map.find(kViewEndCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t end_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -424,18 +437,16 @@ void LoadReporter::ProcessViewDataCallEnd(
continue;
}
LoadRecordKey key(client_ip_and_token, user_id);
- const uint64_t bytes_sent =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
- tag_values);
+ const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
+ tag_values);
const uint64_t bytes_received =
- CensusViewProvider::GetRelatedViewDataRowDouble(
+ CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndBytesReceived,
sizeof(kViewEndBytesReceived) - 1, tag_values);
- const uint64_t latency_ms =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
- tag_values);
+ const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
+ tag_values);
uint64_t ok_count = 0;
uint64_t error_count = 0;
total_end_count += end_count;
diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h
index 49a2e4b53c..b2254d5601 100644
--- a/src/cpp/server/load_reporter/load_reporter.h
+++ b/src/cpp/server/load_reporter/load_reporter.h
@@ -59,7 +59,10 @@ class CensusViewProvider {
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
- double static GetRelatedViewDataRowDouble(
+ static double GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+ static uint64_t GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
new file mode 100644
index 0000000000..d001199b8c
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
@@ -0,0 +1,370 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+}
+
+LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : cq_(std::move(cq)) {
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("server_load_reporting", Work, this));
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
+#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
+ cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
+#endif
+ load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
+ kFeedbackSampleWindowSeconds,
+ std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
+ std::move(cpu_stats_provider)));
+}
+
+LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ cq_->Shutdown();
+ }
+ if (next_fetch_and_sample_alarm_ != nullptr)
+ next_fetch_and_sample_alarm_->Cancel();
+ thread_->Join();
+}
+
+void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
+ auto next_fetch_and_sample_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
+ GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ if (shutdown_) return;
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_fetch_and_sample_alarm_.reset(new Alarm);
+ next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
+ this);
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
+}
+
+void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
+ if (!ok) {
+ gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
+ return;
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
+ load_reporter_->FetchAndSample();
+ ScheduleNextFetchAndSample();
+}
+
+void LoadReporterAsyncServiceImpl::Work(void* arg) {
+ LoadReporterAsyncServiceImpl* service =
+ reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
+ service->FetchAndSample(true /* ok */);
+ // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
+ // to figure out why cq is not ready after service starts.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
+ service->load_reporter_.get());
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ if (tag == service) {
+ service->FetchAndSample(ok);
+ } else {
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter) {
+ std::shared_ptr<ReportLoadHandler> handler =
+ std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
+ ReportLoadHandler* p = handler.get();
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ p->on_done_notified_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ p->next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(handler));
+ p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
+ service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
+ &p->next_inbound_);
+ }
+}
+
+LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter)
+ : cq_(cq),
+ service_(service),
+ load_reporter_(load_reporter),
+ stream_(&ctx_),
+ call_status_(WAITING_FOR_DELIVERY) {}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ call_status_ = DELIVERED;
+ } else {
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ }
+ if (!ok || shutdown_) {
+ // The value of ok being false means that the server is shutting down.
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, service_, load_reporter_);
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ // LB ID is unique for each load reporting stream.
+ lb_id_ = load_reporter_->GenerateLbId();
+ gpr_log(GPR_INFO,
+ "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
+ "Start reading the initial request...",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
+ // The client may have half-closed the stream or the stream is broken.
+ gpr_log(GPR_INFO,
+ "[LRS %p] Failed reading the initial request from the stream "
+ "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
+ service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
+ static_cast<int>(is_cancelled_));
+ }
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ // We only receive one request, which is the initial request.
+ if (call_status_ < INITIAL_REQUEST_RECEIVED) {
+ if (!request_.has_initial_request()) {
+ Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
+ } else {
+ call_status_ = INITIAL_REQUEST_RECEIVED;
+ const auto& initial_request = request_.initial_request();
+ load_balanced_hostname_ = initial_request.load_balanced_hostname();
+ load_key_ = initial_request.load_key();
+ load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
+ load_key_);
+ const auto& load_report_interval = initial_request.load_report_interval();
+ load_report_interval_ms_ =
+ static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
+ load_report_interval.nanos() / 1000);
+ gpr_log(
+ GPR_INFO,
+ "[LRS %p] Initial request received. Start load reporting (load "
+ "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
+ service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
+ lb_id_.c_str(), this);
+ SendReport(self, true /* ok */);
+ // Expect this read to fail.
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ }
+ } else {
+ // Another request received! This violates the spec.
+ gpr_log(GPR_ERROR,
+ "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ Shutdown(std::move(self), "OnReadDone+second_request");
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ auto next_report_time = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_report_alarm_.reset(new Alarm);
+ next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
+ }
+ gpr_log(GPR_DEBUG,
+ "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ ::grpc::lb::v1::LoadReportResponse response;
+ auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
+ response.mutable_load()->Swap(&loads);
+ auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
+ response.mutable_load_balancing_feedback()->Swap(&feedback);
+ if (call_status_ < INITIAL_RESPONSE_SENT) {
+ auto initial_response = response.mutable_initial_response();
+ initial_response->set_load_balancer_id(lb_id_);
+ initial_response->set_implementation_id(
+ ::grpc::lb::v1::InitialLoadReportResponse::CPP);
+ initial_response->set_server_version(kVersion);
+ call_status_ = INITIAL_RESPONSE_SENT;
+ }
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_outbound_);
+ gpr_log(GPR_INFO,
+ "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
+ "count: %d)...",
+ service_, lb_id_.c_str(), this, response.load().size());
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ done_notified_ = true;
+ if (ctx_.IsCancelled()) {
+ is_cancelled_ = true;
+ }
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(is_cancelled_));
+ Shutdown(std::move(self), "OnDoneNotified");
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
+ std::shared_ptr<ReportLoadHandler> self, const char* reason) {
+ if (!shutdown_) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
+ "reason: %s).",
+ service_, lb_id_.c_str(), this, reason);
+ shutdown_ = true;
+ if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
+ load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
+ next_report_alarm_->Cancel();
+ }
+ }
+ // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
+ // try to Finish() every time we are in Shutdown().
+ if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (!service_->shutdown_) {
+ on_finish_done_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Maybe add a message proto for the client to
+ // explicitly cancel the stream so that we can return OK status in such
+ // cases.
+ stream_.Finish(Status::CANCELLED, &on_finish_done_);
+ call_status_ = FINISH_CALLED;
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ }
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
new file mode 100644
index 0000000000..6fc577ff49
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
@@ -0,0 +1,194 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
+#include <grpcpp/grpcpp.h>
+
+#include "src/core/lib/gprpp/thd.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Async load reporting service. It's mainly responsible for controlling the
+// procedure of incoming requests. The real business logic is handed off to the
+// LoadReporter. There should be at most one instance of this service on a
+// server to avoid spreading the load data into multiple places.
+class LoadReporterAsyncServiceImpl
+ : public grpc::lb::v1::LoadReporter::AsyncService {
+ public:
+ explicit LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq);
+ ~LoadReporterAsyncServiceImpl();
+
+ // Starts the working thread.
+ void StartThread();
+
+ // Not copyable nor movable.
+ LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
+ LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
+ delete;
+
+ private:
+ class ReportLoadHandler;
+
+ // A tag that can be called with a bool argument. It's tailored for
+ // ReportLoadHandler's use. Before being used, it should be constructed with a
+ // method of ReportLoadHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function can
+ // only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func,
+ std::shared_ptr<ReportLoadHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no longer
+ // owned by this tag after this method is invoked.
+ void Run(bool ok);
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<ReportLoadHandler> handler_;
+ };
+
+ // Each handler takes care of one load reporting stream. It contains
+ // per-stream data and it will access the members of the parent class (i.e.,
+ // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
+ class ReportLoadHandler {
+ public:
+ // Instantiates a ReportLoadHandler and requests the next load reporting
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ ReportLoadHandler(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ private:
+ // After the handler has a call request delivered, it starts reading the
+ // initial request. Also, a new handler is spawned so that we can keep
+ // servicing future calls.
+ void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The first Read() is expected to succeed, after which the handler starts
+ // sending load reports back to the balancer. The second Read() is
+ // expected to fail, which happens when the balancer half-closes the
+ // stream to signal that it's no longer interested in the load reports. For
+ // the latter case, the handler will then close the stream.
+ void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The report sending operations are sequential as: send report -> send
+ // done, schedule the next send -> waiting for the alarm to fire -> alarm
+ // fires, send report -> ...
+ void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+ void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
+
+ // The key fields of the stream.
+ grpc::string lb_id_;
+ grpc::string load_balanced_hostname_;
+ grpc::string load_key_;
+ uint64_t load_report_interval_ms_;
+
+ // The data for RPC communication with the load reportee.
+ ServerContext ctx_;
+ ::grpc::lb::v1::LoadReportRequest request_;
+
+ // The members passed down from LoadReporterAsyncServiceImpl.
+ ServerCompletionQueue* cq_;
+ LoadReporterAsyncServiceImpl* service_;
+ LoadReporter* load_reporter_;
+ ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
+ ::grpc::lb::v1::LoadReportRequest>
+ stream_;
+
+ // The status of the RPC progress.
+ enum CallStatus {
+ WAITING_FOR_DELIVERY,
+ DELIVERED,
+ INITIAL_REQUEST_RECEIVED,
+ INITIAL_RESPONSE_SENT,
+ FINISH_CALLED
+ } call_status_;
+ bool shutdown_{false};
+ bool done_notified_{false};
+ bool is_cancelled_{false};
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ CallableTag next_inbound_;
+ CallableTag next_outbound_;
+ std::unique_ptr<Alarm> next_report_alarm_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Work(void* arg);
+
+ // Schedules the next data fetching from Census and LB feedback sampling.
+ void ScheduleNextFetchAndSample();
+
+ // Fetches data from Census and samples LB feedback.
+ void FetchAndSample(bool ok);
+
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ // To synchronize the operations related to shutdown state of cq_, so that we
+ // don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
+ std::unique_ptr<LoadReporter> load_reporter_;
+ std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
new file mode 100644
index 0000000000..81cf6ac562
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void LoadReportingServiceServerBuilderOption::UpdateArguments(
+ ::grpc::ChannelArguments* args) {
+ args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true);
+}
+
+void LoadReportingServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) {
+ plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin());
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
new file mode 100644
index 0000000000..c2c5dd5ed5
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+#include <grpcpp/impl/server_initializer.h>
+
+namespace grpc {
+namespace load_reporter {
+
+bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_synchronous_methods();
+ }
+ return false;
+}
+
+bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_async_methods();
+ }
+ return false;
+}
+
+void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder(
+ grpc::ServerBuilder* builder) {
+ auto cq = builder->AddCompletionQueue();
+ service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq));
+}
+
+void LoadReportingServiceServerBuilderPlugin::InitServer(
+ grpc::ServerInitializer* si) {
+ si->RegisterService(service_);
+}
+
+void LoadReportingServiceServerBuilderPlugin::Finish(
+ grpc::ServerInitializer* si) {
+ service_->StartThread();
+ service_.reset();
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
new file mode 100644
index 0000000000..1f098591d4
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/impl/server_builder_plugin.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The plugin that registers and starts load reporting service when starting a
+// server.
+class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin {
+ public:
+ ~LoadReportingServiceServerBuilderPlugin() override = default;
+ grpc::string name() override { return "load_reporting_service"; }
+
+ // Creates a load reporting service.
+ void UpdateServerBuilder(grpc::ServerBuilder* builder) override;
+
+ // Registers the load reporter service.
+ void InitServer(grpc::ServerInitializer* si) override;
+
+ // Starts the load reporter service.
+ void Finish(grpc::ServerInitializer* si) override;
+
+ void ChangeArguments(const grpc::string& name, void* value) override {}
+ void UpdateChannelArguments(grpc::ChannelArguments* args) override {}
+ bool has_sync_methods() const override;
+ bool has_async_methods() const override;
+
+ private:
+ std::shared_ptr<LoadReporterAsyncServiceImpl> service_;
+};
+
+std::unique_ptr<grpc::ServerBuilderPlugin>
+CreateLoadReportingServiceServerBuilderPlugin();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
diff --git a/src/cpp/server/load_reporter/util.cc b/src/cpp/server/load_reporter/util.cc
new file mode 100644
index 0000000000..a2f2f11e70
--- /dev/null
+++ b/src/cpp/server/load_reporter/util.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <grpc/support/log.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value) {
+ if (std::isnormal(cost_value)) {
+ grpc::string buf;
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/proto/grpc/lb/v1/load_reporter.proto b/src/proto/grpc/lb/v1/load_reporter.proto
index c2e4f23f6e..d57a37fed7 100644
--- a/src/proto/grpc/lb/v1/load_reporter.proto
+++ b/src/proto/grpc/lb/v1/load_reporter.proto
@@ -114,6 +114,10 @@ message Load {
// num_calls_in_progress is the only valid entry. If in_progress_report is not
// set, num_calls_in_progress will be ignored. If in_progress_report is set,
// fields other than num_calls_in_progress and orphaned_load will be ignored.
+ // TODO(juanlishen): A Load is either an in_progress_report or not. We should
+ // make this explicit in hierarchy. From the log, I see in_progress_report_
+ // has a random num_calls_in_progress_ when not set, which might lead to bug
+ // when the balancer process the load report.
oneof in_progress_report {
// The number of calls in progress (instantaneously) per load balancer id.
int64 num_calls_in_progress = 5;
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 23dde69dd0..95bb7ed229 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -14,7 +14,7 @@
licenses(["notice"]) # Apache v2
-load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_cc_binary")
+load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
grpc_package(
name = "test/cpp/end2end",
@@ -430,6 +430,20 @@ grpc_cc_binary(
)
grpc_cc_test(
+ name = "server_load_reporting_end2end_test",
+ srcs = ["server_load_reporting_end2end_test.cc"],
+ external_deps = [
+ "gtest",
+ "gmock",
+ ],
+ deps = [
+ "//:grpcpp_server_load_reporting",
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/cpp/util:test_util",
+ ],
+)
+
+grpc_cc_test(
name = "shutdown_test",
srcs = ["shutdown_test.cc"],
external_deps = [
diff --git a/test/cpp/end2end/server_load_reporting_end2end_test.cc b/test/cpp/end2end/server_load_reporting_end2end_test.cc
new file mode 100644
index 0000000000..7bc9af2a6e
--- /dev/null
+++ b/test/cpp/end2end/server_load_reporting_end2end_test.cc
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <thread>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <grpc++/grpc++.h>
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpcpp/ext/server_load_reporting.h>
+#include <grpcpp/server_builder.h>
+
+#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+
+namespace grpc {
+namespace testing {
+namespace {
+
+constexpr double kMetricValue = 3.1415;
+constexpr char kMetricName[] = "METRIC_PI";
+
+// Different messages result in different response statuses. For simplicity in
+// computing request bytes, the message sizes should be the same.
+const char kOkMessage[] = "hello";
+const char kServerErrorMessage[] = "sverr";
+const char kClientErrorMessage[] = "clerr";
+
+class EchoTestServiceImpl : public EchoTestService::Service {
+ public:
+ ~EchoTestServiceImpl() override {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ if (request->message() == kServerErrorMessage) {
+ return Status(StatusCode::UNKNOWN, "Server error requested");
+ }
+ if (request->message() == kClientErrorMessage) {
+ return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
+ }
+ response->set_message(request->message());
+ ::grpc::load_reporter::experimental::AddLoadReportingCost(
+ context, kMetricName, kMetricValue);
+ return Status::OK;
+ }
+};
+
+class ServerLoadReportingEnd2endTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ server_address_ =
+ "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
+ server_ =
+ ServerBuilder()
+ .AddListeningPort(server_address_, InsecureServerCredentials())
+ .RegisterService(&echo_service_)
+ .SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(
+ new ::grpc::load_reporter::experimental::
+ LoadReportingServiceServerBuilderOption()))
+ .BuildAndStart();
+ server_thread_ =
+ std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
+ }
+
+ void RunServerLoop() { server_->Wait(); }
+
+ void TearDown() override {
+ server_->Shutdown();
+ server_thread_.join();
+ }
+
+ void ClientMakeEchoCalls(const grpc::string& lb_id,
+ const grpc::string& lb_tag,
+ const grpc::string& message, size_t num_requests) {
+ auto stub = EchoTestService::NewStub(
+ CreateChannel(server_address_, InsecureChannelCredentials()));
+ grpc::string lb_token = lb_id + lb_tag;
+ for (int i = 0; i < num_requests; ++i) {
+ ClientContext ctx;
+ if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message(message);
+ Status status = stub->Echo(&ctx, request, &response);
+ if (message == kOkMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::OK);
+ ASSERT_EQ(request.message(), response.message());
+ } else if (message == kServerErrorMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
+ } else if (message == kClientErrorMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
+ }
+ }
+ }
+
+ grpc::string server_address_;
+ std::unique_ptr<Server> server_;
+ std::thread server_thread_;
+ EchoTestServiceImpl echo_service_;
+};
+
+TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
+
+TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
+ auto channel =
+ grpc::CreateChannel(server_address_, InsecureChannelCredentials());
+ auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel);
+ ClientContext ctx;
+ auto stream = stub->ReportLoad(&ctx);
+ ::grpc::lb::v1::LoadReportRequest request;
+ request.mutable_initial_request()->set_load_balanced_hostname(
+ server_address_);
+ request.mutable_initial_request()->set_load_key("LOAD_KEY");
+ request.mutable_initial_request()
+ ->mutable_load_report_interval()
+ ->set_seconds(5);
+ stream->Write(request);
+ gpr_log(GPR_INFO, "Initial request sent.");
+ ::grpc::lb::v1::LoadReportResponse response;
+ stream->Read(&response);
+ const grpc::string& lb_id = response.initial_response().load_balancer_id();
+ gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
+ ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
+ while (true) {
+ stream->Read(&response);
+ if (!response.load().empty()) {
+ ASSERT_EQ(response.load().size(), 3);
+ for (const auto& load : response.load()) {
+ if (load.in_progress_report_case()) {
+ // The special load record that reports the number of in-progress
+ // calls.
+ ASSERT_EQ(load.num_calls_in_progress(), 1);
+ } else if (load.orphaned_load_case()) {
+ // The call from the balancer doesn't have any valid LB token.
+ ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
+ ASSERT_EQ(load.num_calls_started(), 1);
+ ASSERT_EQ(load.num_calls_finished_without_error(), 0);
+ ASSERT_EQ(load.num_calls_finished_with_error(), 0);
+ } else {
+ // This corresponds to the calls from the client.
+ ASSERT_EQ(load.num_calls_started(), 1);
+ ASSERT_EQ(load.num_calls_finished_without_error(), 1);
+ ASSERT_EQ(load.num_calls_finished_with_error(), 0);
+ ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
+ ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
+ ASSERT_EQ(load.metric_data().size(), 1);
+ ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
+ ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
+ 1);
+ ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
+ kMetricValue);
+ }
+ }
+ break;
+ }
+ }
+ stream->WritesDone();
+ ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
+}
+
+// TODO(juanlishen): Add more tests.
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/server/load_reporter/BUILD b/test/cpp/server/load_reporter/BUILD
index ebfcfbb348..b7c4d29d71 100644
--- a/test/cpp/server/load_reporter/BUILD
+++ b/test/cpp/server/load_reporter/BUILD
@@ -42,6 +42,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:lb_load_reporter",
+ "//:lb_server_load_reporting_filter",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
diff --git a/test/cpp/server/load_reporter/load_reporter_test.cc b/test/cpp/server/load_reporter/load_reporter_test.cc
index 3264dba134..719c3a67d9 100644
--- a/test/cpp/server/load_reporter/load_reporter_test.cc
+++ b/test/cpp/server/load_reporter/load_reporter_test.cc
@@ -25,6 +25,7 @@
#include <grpc/grpc.h>
#include <gtest/gtest.h>
+#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
@@ -123,6 +124,14 @@ class LoadReporterTest : public ::testing::Test {
private:
void SetUp() override {
+ // Access the measures to make them valid.
+ ::grpc::load_reporter::MeasureStartCount();
+ ::grpc::load_reporter::MeasureEndCount();
+ ::grpc::load_reporter::MeasureEndBytesSent();
+ ::grpc::load_reporter::MeasureEndBytesReceived();
+ ::grpc::load_reporter::MeasureEndLatencyMs();
+ ::grpc::load_reporter::MeasureOtherCallMetric();
+ // Set up the load reporter.
auto mock_cpu = new MockCpuStatsProvider();
auto mock_census = new MockCensusViewProvider();
// Prepare the initial CPU stats data. Note that the expectation should be