aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD55
-rw-r--r--doc/keepalive.md50
-rw-r--r--include/grpcpp/ext/server_load_reporting.h53
-rw-r--r--include/grpcpp/impl/codegen/server_context.h2
-rw-r--r--src/core/ext/filters/load_reporting/registered_opencensus_objects.h65
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc54
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h9
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc5
-rw-r--r--src/cpp/server/load_reporter/constants.h10
-rw-r--r--src/cpp/server/load_reporter/load_data_store.h3
-rw-r--r--src/cpp/server/load_reporter/load_reporter.cc79
-rw-r--r--src/cpp/server/load_reporter/load_reporter.h5
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.cc370
-rw-r--r--src/cpp/server/load_reporter/load_reporter_async_service_impl.h194
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc41
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc60
-rw-r--r--src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h62
-rw-r--r--src/cpp/server/load_reporter/util.cc45
-rw-r--r--src/proto/grpc/lb/v1/load_reporter.proto4
-rw-r--r--templates/tools/dockerfile/test/bazel/Dockerfile.template2
-rw-r--r--test/cpp/end2end/BUILD16
-rw-r--r--test/cpp/end2end/server_load_reporting_end2end_test.cc191
-rw-r--r--test/cpp/server/load_reporter/BUILD1
-rw-r--r--test/cpp/server/load_reporter/load_reporter_test.cc9
-rw-r--r--test/cpp/util/test_credentials_provider.cc2
-rw-r--r--test/cpp/util/test_credentials_provider.h1
-rw-r--r--tools/dockerfile/test/bazel/Dockerfile16
-rw-r--r--tools/doxygen/Doxyfile.c++1
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/doxygen/Doxyfile.core1
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rw-r--r--tools/internal_ci/linux/grpc_python_bazel_test.cfg23
-rwxr-xr-xtools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh27
34 files changed, 1369 insertions, 91 deletions
diff --git a/BUILD b/BUILD
index c7d64b4a6c..8523bbb660 100644
--- a/BUILD
+++ b/BUILD
@@ -98,10 +98,10 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/grpc.h",
"include/grpc/grpc_posix.h",
"include/grpc/grpc_security_constants.h",
- "include/grpc/load_reporting.h",
"include/grpc/slice.h",
"include/grpc/slice_buffer.h",
"include/grpc/status.h",
+ "include/grpc/load_reporting.h",
"include/grpc/support/workaround_list.h",
]
@@ -1201,9 +1201,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@@ -1211,9 +1211,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@@ -1234,9 +1234,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
@@ -1244,9 +1244,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h",
+ "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
external_deps = [
"nanopb",
@@ -1334,6 +1334,51 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "lb_server_load_reporting_service_server_builder_plugin",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc",
+ ],
+ hdrs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h",
+ ],
+ language = "c++",
+ deps = [
+ "lb_load_reporter_service",
+ ],
+)
+
+grpc_cc_library(
+ name = "grpcpp_server_load_reporting",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc",
+ "src/cpp/server/load_reporter/util.cc",
+ ],
+ language = "c++",
+ public_hdrs = [
+ "include/grpcpp/ext/server_load_reporting.h",
+ ],
+ deps = [
+ "lb_server_load_reporting_filter",
+ "lb_server_load_reporting_service_server_builder_plugin",
+ ],
+ alwayslink = 1,
+)
+
+grpc_cc_library(
+ name = "lb_load_reporter_service",
+ srcs = [
+ "src/cpp/server/load_reporter/load_reporter_async_service_impl.cc",
+ ],
+ hdrs = [
+ "src/cpp/server/load_reporter/load_reporter_async_service_impl.h",
+ ],
+ language = "c++",
+ deps = [
+ "lb_load_reporter",
+ ],
+)
+
+grpc_cc_library(
name = "lb_get_cpu_stats",
srcs = [
"src/cpp/server/load_reporter/get_cpu_stats_linux.cc",
diff --git a/doc/keepalive.md b/doc/keepalive.md
new file mode 100644
index 0000000000..2f9c8bfc9e
--- /dev/null
+++ b/doc/keepalive.md
@@ -0,0 +1,50 @@
+# Keepalive User Guide for gRPC Core (and dependants)
+
+The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected.
+
+This guide documents the knobs within gRPC core to control the current behavior of the keepalive ping.
+
+The keepalive ping is controlled by two important channel arguments -
+* **GRPC_ARG_KEEPALIVE_TIME_MS**
+ * This channel argument controls the period (in milliseconds) after which a keepalive ping is sent on the transport.
+* **GRPC_ARG_KEEPALIVE_TIMEOUT_MS**
+ * This channel argument controls the amount of time (in milliseconds), the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgement within this time, it will close the connection.
+
+The above two channel arguments should be sufficient for most users, but the following arguments can also be useful in certain use cases.
+* **GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS**
+ * This channel argument if set to 1 (0 : false; 1 : true), allows keepalive pings to be sent even if there are no calls in flight.
+* **GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA**
+ * This channel argument controls the maximum number of pings that can be sent when there is no other data (data frame or header frame) to be sent. GRPC Core will not continue sending pings if we run over the limit. Setting it to 0 allows sending pings without sending data.
+* **GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS**
+ * If there is no data being sent on the transport, this channel argument controls the minimum time (in milliseconds) gRPC Core will wait between successive pings.
+* **GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS**
+ * If there is no data being sent on the transport, this channel argument on the server side controls the minimum time (in milliseconds) that gRPC Core would expect between receiving successive pings. If the time between successive pings is less that than this time, then the ping will be considered a bad ping from the peer. Such a ping counts as a ‘ping strike’.
+On the client side, this does not have any effect.
+* **GRPC_ARG_HTTP2_MAX_PING_STRIKES**
+ * This arg controls the maximum number of bad pings that the server will tolerate before sending an HTTP2 GOAWAY frame and closing the transport. Setting it to 0 allows the server to accept any number of bad pings.
+
+### Defaults Values
+
+Channel Argument| Client|Server
+----------------|-------|------
+GRPC_ARG_KEEPALIVE_TIME_MS|INT_MAX (disabled)|7200000 (2 hours)
+GRPC_ARG_KEEPALIVE_TIMEOUT_MS|20000 (20 seconds)|20000 (20 seconds)
+GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS|0 (false)|0 (false)
+GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA|2|2
+GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS|300000 (5 minutes)|300000 (5 minutes)
+GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS|N/A|300000 (5 minutes)
+GRPC_ARG_HTTP2_MAX_PING_STRIKES|N/A|2
+
+### FAQ
+* When is the keepalive timer started?
+ * The keepalive timer is started when a transport is done connecting (after handshake).
+* What happens when the keepalive timer fires?
+ * When the keepalive timer fires, gRPC Core would try to send a keepalive ping on the transport. This ping can be blocked if -
+ * there is no active call on that transport and GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS is false.
+ * the number of pings already sent on the transport without any data has already exceeded GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA.
+ * the time expired since the previous ping is less than GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS.
+ * If a keepalive ping is not blocked and is sent on the transport, then the keepalive watchdog timer is started which would close the transport if the ping is not acknowledged before it fires.
+* Why am I receiving a GOAWAY with error code ENHANCE_YOUR_CALM?
+ * A server sends a GOAWAY with ENHANCE_YOUR_CALM if the client sends too many misbehaving pings. For example -
+ * if a server has GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS set to false, and the client sends pings without there being any call in flight.
+ * if the client's GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS setting is lower than the server's GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS.
diff --git a/include/grpcpp/ext/server_load_reporting.h b/include/grpcpp/ext/server_load_reporting.h
new file mode 100644
index 0000000000..939569c19a
--- /dev/null
+++ b/include/grpcpp/ext/server_load_reporting.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_EXT_SERVER_LOAD_REPORTING_H
+#define GRPCPP_EXT_SERVER_LOAD_REPORTING_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/load_reporting.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/server_builder_option.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+// The ServerBuilderOption to enable server-side load reporting feature. To
+// enable the feature, please make sure the binary builds with the
+// grpcpp_server_load_reporting library and set this option in the
+// ServerBuilder.
+class LoadReportingServiceServerBuilderOption : public ServerBuilderOption {
+ public:
+ void UpdateArguments(::grpc::ChannelArguments* args) override;
+ void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
+ plugins) override;
+};
+
+// Adds the load reporting cost with \a cost_name and \a cost_value in the
+// trailing metadata of the server context.
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value);
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPCPP_EXT_SERVER_LOAD_REPORTING_H
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index bced4202dd..153b404d9e 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -201,7 +201,7 @@ class ServerContext {
/// \param algorithm The compression algorithm used for the server call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
- /// Set the load reporting costs in \a cost_data for the call.
+ /// Set the serialized load reporting costs in \a cost_data for the call.
void SetLoadReportingCosts(const std::vector<grpc::string>& cost_data);
/// Return the authentication context for this server call.
diff --git a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
index b62863dc2c..4eacda7c02 100644
--- a/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
+++ b/src/core/ext/filters/load_reporting/registered_opencensus_objects.h
@@ -28,75 +28,84 @@
namespace grpc {
namespace load_reporter {
+// Note that the functions here are specified as inline to share the static
+// objects across all the translation units including this header. See more
+// details on https://en.cppreference.com/w/cpp/language/inline.
+
// Measures.
-::opencensus::stats::MeasureInt64 MeasureStartCount() {
- static const ::opencensus::stats::MeasureInt64 start_count =
+inline ::opencensus::stats::MeasureInt64 MeasureStartCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureStartCount, kMeasureStartCount, kMeasureStartCount);
- return start_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndCount() {
- static const ::opencensus::stats::MeasureInt64 end_count =
+inline ::opencensus::stats::MeasureInt64 MeasureEndCount() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndCount, kMeasureEndCount, kMeasureEndCount);
- return end_count;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_sent =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesSent() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndBytesSent, kMeasureEndBytesSent, kMeasureEndBytesSent);
- return end_bytes_sent;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
- static const ::opencensus::stats::MeasureInt64 end_bytes_received =
+inline ::opencensus::stats::MeasureInt64 MeasureEndBytesReceived() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(kMeasureEndBytesReceived,
kMeasureEndBytesReceived,
kMeasureEndBytesReceived);
- return end_bytes_received;
+ return measure;
}
-::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
- static const ::opencensus::stats::MeasureInt64 end_latency_ms =
+inline ::opencensus::stats::MeasureInt64 MeasureEndLatencyMs() {
+ static const ::opencensus::stats::MeasureInt64 measure =
::opencensus::stats::MeasureInt64::Register(
kMeasureEndLatencyMs, kMeasureEndLatencyMs, kMeasureEndLatencyMs);
- return end_latency_ms;
+ return measure;
}
-::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
- static const ::opencensus::stats::MeasureDouble other_call_metric =
+inline ::opencensus::stats::MeasureDouble MeasureOtherCallMetric() {
+ static const ::opencensus::stats::MeasureDouble measure =
::opencensus::stats::MeasureDouble::Register(kMeasureOtherCallMetric,
kMeasureOtherCallMetric,
kMeasureOtherCallMetric);
- return other_call_metric;
+ return measure;
}
// Tags.
-opencensus::stats::TagKey TagKeyToken() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyToken);
+inline ::opencensus::stats::TagKey TagKeyToken() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyToken);
return token;
}
-opencensus::stats::TagKey TagKeyHost() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyHost);
+inline ::opencensus::stats::TagKey TagKeyHost() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyHost);
return token;
}
-opencensus::stats::TagKey TagKeyUserId() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyUserId);
+
+inline ::opencensus::stats::TagKey TagKeyUserId() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyUserId);
return token;
}
-opencensus::stats::TagKey TagKeyStatus() {
- static const auto token = opencensus::stats::TagKey::Register(kTagKeyStatus);
+inline ::opencensus::stats::TagKey TagKeyStatus() {
+ static const ::opencensus::stats::TagKey token =
+ opencensus::stats::TagKey::Register(kTagKeyStatus);
return token;
}
-opencensus::stats::TagKey TagKeyMetricName() {
- static const auto token =
+inline ::opencensus::stats::TagKey TagKeyMetricName() {
+ static const ::opencensus::stats::TagKey token =
opencensus::stats::TagKey::Register(kTagKeyMetricName);
return token;
}
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index 51e4d795d7..6529046a5e 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc_security.h>
-#include <grpc/load_reporting.h>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -31,18 +30,20 @@
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
-#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils.h"
-#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/transport/static_metadata.h"
namespace grpc {
+constexpr char kEncodedIpv4AddressLengthString[] = "08";
+constexpr char kEncodedIpv6AddressLengthString[] = "32";
+constexpr char kEmptyAddressLengthString[] = "00";
+constexpr size_t kLengthPrefixSize = 2;
+
grpc_error* ServerLoadReportingChannelData::Init(
grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
@@ -90,7 +91,9 @@ void ServerLoadReportingCallData::Destroy(
{chand->peer_identity(), chand->peer_identity_len()}},
{::grpc::load_reporter::TagKeyStatus(),
GetStatusTagForStatus(final_info->final_status)}});
+ gpr_free(client_ip_and_lr_token_);
}
+ gpr_free(target_host_);
grpc_slice_unref_internal(service_method_);
}
@@ -100,7 +103,8 @@ void ServerLoadReportingCallData::StartTransportStreamOpBatch(
if (op->recv_initial_metadata() != nullptr) {
// Save some fields to use when initial metadata is ready.
peer_string_ = op->get_peer_string();
- recv_initial_metadata_ = op->recv_initial_metadata();
+ recv_initial_metadata_ =
+ op->op()->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
// Substitute the original closure for the wrapper closure.
op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
@@ -157,10 +161,10 @@ void ServerLoadReportingCallData::GetCensusSafeClientIpString(
*size = 8;
} else if (addr->sa_family == GRPC_AF_INET6) {
grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
- *client_ip_string = static_cast<char*>(gpr_malloc(32));
+ *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
for (size_t i = 0; i < 16; ++i) {
- sprintf(*client_ip_string + i, "%02x",
- addr6->sin6_addr.__in6_u.__u6_addr8[i]);
+ snprintf(*client_ip_string + i * 2, 2 + 1, "%02x",
+ addr6->sin6_addr.__in6_u.__u6_addr8[i]);
}
*size = 32;
} else {
@@ -241,7 +245,7 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
if (err == GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR(
"server_load_reporting_filter",
- grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
+ grpc_metadata_batch_filter(calld->recv_initial_metadata_,
RecvInitialMetadataFilter, elem,
"recv_initial_metadata filtering error"));
// If the LB token was not found in the recv_initial_metadata, only the
@@ -277,7 +281,6 @@ grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
ServerLoadReportingChannelData* chand =
reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
- // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
const grpc_slice value = GRPC_MDVALUE(md);
const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
@@ -325,4 +328,35 @@ const char* ServerLoadReportingCallData::GetStatusTagForStatus(
}
}
+namespace {
+bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
+}
+} // namespace
+
+// TODO(juanlishen): We should register the filter during grpc initialization
+// time once OpenCensus is compatible with our build system. For now, we force
+// registration of the server load reporting filter at static initialization
+// time if we build with the filter target.
+struct ServerLoadReportingFilterStaticRegistrar {
+ ServerLoadReportingFilterStaticRegistrar() {
+ static std::atomic_bool registered{false};
+ if (registered) return;
+ RegisterChannelFilter<ServerLoadReportingChannelData,
+ ServerLoadReportingCallData>(
+ "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
+ MaybeAddServerLoadReportingFilter);
+ // Access measures to ensure they are initialized. Otherwise, we can't
+ // create any valid view before the first RPC.
+ ::grpc::load_reporter::MeasureStartCount();
+ ::grpc::load_reporter::MeasureEndCount();
+ ::grpc::load_reporter::MeasureEndBytesSent();
+ ::grpc::load_reporter::MeasureEndBytesReceived();
+ ::grpc::load_reporter::MeasureEndLatencyMs();
+ ::grpc::load_reporter::MeasureOtherCallMetric();
+ registered = true;
+ }
+} server_load_reporting_filter_static_registrar;
+
} // namespace grpc
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index 029b19ac89..10baf1f833 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -86,8 +86,8 @@ class ServerLoadReportingCallData : public CallData {
// The received initial metadata (a member of the recv_initial_metadata op).
// When it is ready, we will extract some data from it via
// recv_initial_metadata_ready_ closure, before the original
- // recv_initial_metadata_ready closure,
- MetadataBatch* recv_initial_metadata_;
+ // recv_initial_metadata_ready closure.
+ grpc_metadata_batch* recv_initial_metadata_;
// The original recv_initial_metadata closure, which is wrapped by our own
// closure (recv_initial_metadata_ready_) to capture the incoming initial
@@ -112,11 +112,6 @@ class ServerLoadReportingCallData : public CallData {
// token.
char* client_ip_and_lr_token_;
size_t client_ip_and_lr_token_len_;
-
- static constexpr char kEncodedIpv4AddressLengthString[] = "08";
- static constexpr char kEncodedIpv6AddressLengthString[] = "32";
- static constexpr char kEmptyAddressLengthString[] = "00";
- static constexpr size_t kLengthPrefixSize = 2;
};
} // namespace grpc
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
index 907ba71178..ccf2256974 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
@@ -1622,7 +1622,7 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,
grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_slice slice, int is_last) {
- GPR_TIMER_SCOPE("grpc_chttp2_hpack_parser_parse", 0);
+ GPR_TIMER_SCOPE("grpc_chttp2_header_parser_parse", 0);
grpc_chttp2_hpack_parser* parser =
static_cast<grpc_chttp2_hpack_parser*>(hpack_parser);
if (s != nullptr) {
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 5ffabdc665..7b368410cf 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -494,8 +494,9 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
is_fd_closed = true;
}
+ // TODO(sreek): handle fd removal (where is_fd_closed=false)
if (!is_fd_closed) {
- gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
+ GRPC_FD_TRACE("epoll_fd %p (%d) was orphaned but not closed.", fd, fd->fd);
}
/* Remove the active status but keep referenced. We want this grpc_fd struct
@@ -1564,7 +1565,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
gpr_mu_unlock(b_mu);
}
// try to do the least copying possible
- // TODO(ctiller): there's probably a better heuristic here
+ // TODO(sreek): there's probably a better heuristic here
const size_t a_size = a->fd_count + a->pollset_count;
const size_t b_size = b->fd_count + b->pollset_count;
if (b_size > a_size) {
diff --git a/src/cpp/server/load_reporter/constants.h b/src/cpp/server/load_reporter/constants.h
index 07c5965fff..00ad794a04 100644
--- a/src/cpp/server/load_reporter/constants.h
+++ b/src/cpp/server/load_reporter/constants.h
@@ -24,6 +24,16 @@
namespace grpc {
namespace load_reporter {
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
constexpr size_t kLbIdLength = 8;
constexpr size_t kIpv4AddressLength = 8;
constexpr size_t kIpv6AddressLength = 32;
diff --git a/src/cpp/server/load_reporter/load_data_store.h b/src/cpp/server/load_reporter/load_data_store.h
index 2da78ea064..dc08ecf479 100644
--- a/src/cpp/server/load_reporter/load_data_store.h
+++ b/src/cpp/server/load_reporter/load_data_store.h
@@ -160,7 +160,8 @@ class LoadRecordValue {
", error_count_=" + grpc::to_string(error_count_) +
", bytes_sent_=" + grpc::to_string(bytes_sent_) +
", bytes_recv_=" + grpc::to_string(bytes_recv_) +
- ", latency_ms_=" + grpc::to_string(latency_ms_) + "]";
+ ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
+ grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
}
bool InsertCallMetric(const grpc::string& metric_name,
diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc
index 3f0063d883..464063a13f 100644
--- a/src/cpp/server/load_reporter/load_reporter.cc
+++ b/src/cpp/server/load_reporter/load_reporter.cc
@@ -22,6 +22,7 @@
#include <stdio.h>
#include <chrono>
#include <ctime>
+#include <iterator>
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
@@ -65,8 +66,8 @@ CensusViewProvider::CensusViewProvider()
// measurements instead of setting the data values directly.
auto vd_end_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndCount))
- .set_measure((kMeasureEndCount))
+ .set_name(kViewEndCount)
+ .set_measure(kMeasureEndCount)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -80,8 +81,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
auto vd_end_bytes_sent =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesSent))
- .set_measure((kMeasureEndBytesSent))
+ .set_name(kViewEndBytesSent)
+ .set_measure(kMeasureEndBytesSent)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -95,8 +96,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
auto vd_end_bytes_received =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndBytesReceived))
- .set_measure((kMeasureEndBytesReceived))
+ .set_name(kViewEndBytesReceived)
+ .set_measure(kMeasureEndBytesReceived)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -110,8 +111,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
auto vd_end_latency_ms =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewEndLatencyMs))
- .set_measure((kMeasureEndLatencyMs))
+ .set_name(kViewEndLatencyMs)
+ .set_measure(kMeasureEndLatencyMs)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -126,8 +127,8 @@ CensusViewProvider::CensusViewProvider()
// Two views related to other call metrics.
auto vd_metric_call_count =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricCount))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricCount)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Count())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -141,8 +142,8 @@ CensusViewProvider::CensusViewProvider()
view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
auto vd_metric_value =
::opencensus::stats::ViewDescriptor()
- .set_name((kViewOtherCallMetricValue))
- .set_measure((kMeasureOtherCallMetric))
+ .set_name(kViewOtherCallMetricValue)
+ .set_measure(kMeasureOtherCallMetric)
.set_aggregation(::opencensus::stats::Aggregation::Sum())
.add_column(tag_key_token_)
.add_column(tag_key_host_)
@@ -161,11 +162,26 @@ double CensusViewProvider::GetRelatedViewDataRowDouble(
size_t view_name_len, const std::vector<grpc::string>& tag_values) {
auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kDouble);
auto it_row = it_vd->second.double_data().find(tag_values);
GPR_ASSERT(it_row != it_vd->second.double_data().end());
return it_row->second;
}
+uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kInt64);
+ auto it_row = it_vd->second.int_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.int_data().end());
+ GPR_ASSERT(it_row->second >= 0);
+ return it_row->second;
+}
+
CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
for (const auto& p : view_descriptor_map()) {
const grpc::string& view_name = p.first;
@@ -235,23 +251,23 @@ LoadReporter::GenerateLoadBalancingFeedback() {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
// Find the longest range with valid ends.
- LoadBalancingFeedbackRecord* oldest = &feedback_records_[0];
- LoadBalancingFeedbackRecord* newest =
- &feedback_records_[feedback_records_.size() - 1];
- while (newest > oldest &&
+ auto oldest = feedback_records_.begin();
+ auto newest = feedback_records_.end() - 1;
+ while (std::distance(oldest, newest) > 0 &&
(newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
// A zero limit means that the system info reading was failed, so these
// records can't be used to calculate CPU utilization.
if (newest->cpu_limit == 0) --newest;
if (oldest->cpu_limit == 0) ++oldest;
}
- if (newest - oldest < 1 || oldest->end_time == newest->end_time ||
+ if (std::distance(oldest, newest) < 1 ||
+ oldest->end_time == newest->end_time ||
newest->cpu_limit == oldest->cpu_limit) {
return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
}
uint64_t rpcs = 0;
uint64_t errors = 0;
- for (LoadBalancingFeedbackRecord* p = newest; p != oldest; --p) {
+ for (auto p = newest; p != oldest; --p) {
// Because these two numbers are counters, the oldest record shouldn't be
// included.
rpcs += p->rpcs;
@@ -338,7 +354,8 @@ void LoadReporter::AttachOrphanLoadId(
if (per_balancer_store.lb_id() == kInvalidLbId) {
load->set_load_key_unknown(true);
} else {
- load->set_load_key_unknown(false);
+ // We shouldn't set load_key_unknown to any value in this case because
+ // load_key_unknown and orphaned_load_identifier are under an oneof struct.
load->mutable_orphaned_load_identifier()->set_load_key(
per_balancer_store.load_key());
load->mutable_orphaned_load_identifier()->set_load_balancer_id(
@@ -381,9 +398,7 @@ void LoadReporter::ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map) {
auto it = view_data_map.find(kViewStartCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t start_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -405,9 +420,7 @@ void LoadReporter::ProcessViewDataCallEnd(
uint64_t total_error_count = 0;
auto it = view_data_map.find(kViewEndCount);
if (it != view_data_map.end()) {
- // Note that the data type for any Sum view is double, whatever the data
- // type of the original measure.
- for (const auto& p : it->second.double_data()) {
+ for (const auto& p : it->second.int_data()) {
const std::vector<grpc::string>& tag_values = p.first;
const uint64_t end_count = static_cast<uint64_t>(p.second);
const grpc::string& client_ip_and_token = tag_values[0];
@@ -424,18 +437,16 @@ void LoadReporter::ProcessViewDataCallEnd(
continue;
}
LoadRecordKey key(client_ip_and_token, user_id);
- const uint64_t bytes_sent =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
- tag_values);
+ const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
+ tag_values);
const uint64_t bytes_received =
- CensusViewProvider::GetRelatedViewDataRowDouble(
+ CensusViewProvider::GetRelatedViewDataRowInt(
view_data_map, kViewEndBytesReceived,
sizeof(kViewEndBytesReceived) - 1, tag_values);
- const uint64_t latency_ms =
- CensusViewProvider::GetRelatedViewDataRowDouble(
- view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
- tag_values);
+ const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
+ tag_values);
uint64_t ok_count = 0;
uint64_t error_count = 0;
total_end_count += end_count;
diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h
index 49a2e4b53c..b2254d5601 100644
--- a/src/cpp/server/load_reporter/load_reporter.h
+++ b/src/cpp/server/load_reporter/load_reporter.h
@@ -59,7 +59,10 @@ class CensusViewProvider {
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
- double static GetRelatedViewDataRowDouble(
+ static double GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+ static uint64_t GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<grpc::string>& tag_values);
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
new file mode 100644
index 0000000000..d001199b8c
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
@@ -0,0 +1,370 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+}
+
+LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : cq_(std::move(cq)) {
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("server_load_reporting", Work, this));
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
+#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
+ cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
+#endif
+ load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
+ kFeedbackSampleWindowSeconds,
+ std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
+ std::move(cpu_stats_provider)));
+}
+
+LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ cq_->Shutdown();
+ }
+ if (next_fetch_and_sample_alarm_ != nullptr)
+ next_fetch_and_sample_alarm_->Cancel();
+ thread_->Join();
+}
+
+void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
+ auto next_fetch_and_sample_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
+ GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ if (shutdown_) return;
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_fetch_and_sample_alarm_.reset(new Alarm);
+ next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
+ this);
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
+}
+
+void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
+ if (!ok) {
+ gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
+ return;
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
+ load_reporter_->FetchAndSample();
+ ScheduleNextFetchAndSample();
+}
+
+void LoadReporterAsyncServiceImpl::Work(void* arg) {
+ LoadReporterAsyncServiceImpl* service =
+ reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
+ service->FetchAndSample(true /* ok */);
+ // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
+ // to figure out why cq is not ready after service starts.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
+ service->load_reporter_.get());
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ if (tag == service) {
+ service->FetchAndSample(ok);
+ } else {
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter) {
+ std::shared_ptr<ReportLoadHandler> handler =
+ std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
+ ReportLoadHandler* p = handler.get();
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ p->on_done_notified_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ p->next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(handler));
+ p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
+ service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
+ &p->next_inbound_);
+ }
+}
+
+LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter)
+ : cq_(cq),
+ service_(service),
+ load_reporter_(load_reporter),
+ stream_(&ctx_),
+ call_status_(WAITING_FOR_DELIVERY) {}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ call_status_ = DELIVERED;
+ } else {
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ }
+ if (!ok || shutdown_) {
+ // The value of ok being false means that the server is shutting down.
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, service_, load_reporter_);
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ // LB ID is unique for each load reporting stream.
+ lb_id_ = load_reporter_->GenerateLbId();
+ gpr_log(GPR_INFO,
+ "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
+ "Start reading the initial request...",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
+ // The client may have half-closed the stream or the stream is broken.
+ gpr_log(GPR_INFO,
+ "[LRS %p] Failed reading the initial request from the stream "
+ "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
+ service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
+ static_cast<int>(is_cancelled_));
+ }
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ // We only receive one request, which is the initial request.
+ if (call_status_ < INITIAL_REQUEST_RECEIVED) {
+ if (!request_.has_initial_request()) {
+ Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
+ } else {
+ call_status_ = INITIAL_REQUEST_RECEIVED;
+ const auto& initial_request = request_.initial_request();
+ load_balanced_hostname_ = initial_request.load_balanced_hostname();
+ load_key_ = initial_request.load_key();
+ load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
+ load_key_);
+ const auto& load_report_interval = initial_request.load_report_interval();
+ load_report_interval_ms_ =
+ static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
+ load_report_interval.nanos() / 1000);
+ gpr_log(
+ GPR_INFO,
+ "[LRS %p] Initial request received. Start load reporting (load "
+ "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
+ service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
+ lb_id_.c_str(), this);
+ SendReport(self, true /* ok */);
+ // Expect this read to fail.
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ }
+ } else {
+ // Another request received! This violates the spec.
+ gpr_log(GPR_ERROR,
+ "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ Shutdown(std::move(self), "OnReadDone+second_request");
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ auto next_report_time = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_report_alarm_.reset(new Alarm);
+ next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
+ }
+ gpr_log(GPR_DEBUG,
+ "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ ::grpc::lb::v1::LoadReportResponse response;
+ auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
+ response.mutable_load()->Swap(&loads);
+ auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
+ response.mutable_load_balancing_feedback()->Swap(&feedback);
+ if (call_status_ < INITIAL_RESPONSE_SENT) {
+ auto initial_response = response.mutable_initial_response();
+ initial_response->set_load_balancer_id(lb_id_);
+ initial_response->set_implementation_id(
+ ::grpc::lb::v1::InitialLoadReportResponse::CPP);
+ initial_response->set_server_version(kVersion);
+ call_status_ = INITIAL_RESPONSE_SENT;
+ }
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_outbound_);
+ gpr_log(GPR_INFO,
+ "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
+ "count: %d)...",
+ service_, lb_id_.c_str(), this, response.load().size());
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ done_notified_ = true;
+ if (ctx_.IsCancelled()) {
+ is_cancelled_ = true;
+ }
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(is_cancelled_));
+ Shutdown(std::move(self), "OnDoneNotified");
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
+ std::shared_ptr<ReportLoadHandler> self, const char* reason) {
+ if (!shutdown_) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
+ "reason: %s).",
+ service_, lb_id_.c_str(), this, reason);
+ shutdown_ = true;
+ if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
+ load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
+ next_report_alarm_->Cancel();
+ }
+ }
+ // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
+ // try to Finish() every time we are in Shutdown().
+ if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (!service_->shutdown_) {
+ on_finish_done_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Maybe add a message proto for the client to
+ // explicitly cancel the stream so that we can return OK status in such
+ // cases.
+ stream_.Finish(Status::CANCELLED, &on_finish_done_);
+ call_status_ = FINISH_CALLED;
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ }
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
new file mode 100644
index 0000000000..6fc577ff49
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
@@ -0,0 +1,194 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
+#include <grpcpp/grpcpp.h>
+
+#include "src/core/lib/gprpp/thd.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Async load reporting service. It's mainly responsible for controlling the
+// procedure of incoming requests. The real business logic is handed off to the
+// LoadReporter. There should be at most one instance of this service on a
+// server to avoid spreading the load data into multiple places.
+class LoadReporterAsyncServiceImpl
+ : public grpc::lb::v1::LoadReporter::AsyncService {
+ public:
+ explicit LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq);
+ ~LoadReporterAsyncServiceImpl();
+
+ // Starts the working thread.
+ void StartThread();
+
+ // Not copyable nor movable.
+ LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
+ LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
+ delete;
+
+ private:
+ class ReportLoadHandler;
+
+ // A tag that can be called with a bool argument. It's tailored for
+ // ReportLoadHandler's use. Before being used, it should be constructed with a
+ // method of ReportLoadHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function can
+ // only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func,
+ std::shared_ptr<ReportLoadHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no longer
+ // owned by this tag after this method is invoked.
+ void Run(bool ok);
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<ReportLoadHandler> handler_;
+ };
+
+ // Each handler takes care of one load reporting stream. It contains
+ // per-stream data and it will access the members of the parent class (i.e.,
+ // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
+ class ReportLoadHandler {
+ public:
+ // Instantiates a ReportLoadHandler and requests the next load reporting
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ ReportLoadHandler(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ private:
+ // After the handler has a call request delivered, it starts reading the
+ // initial request. Also, a new handler is spawned so that we can keep
+ // servicing future calls.
+ void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The first Read() is expected to succeed, after which the handler starts
+ // sending load reports back to the balancer. The second Read() is
+ // expected to fail, which happens when the balancer half-closes the
+ // stream to signal that it's no longer interested in the load reports. For
+ // the latter case, the handler will then close the stream.
+ void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The report sending operations are sequential as: send report -> send
+ // done, schedule the next send -> waiting for the alarm to fire -> alarm
+ // fires, send report -> ...
+ void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+ void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
+
+ // The key fields of the stream.
+ grpc::string lb_id_;
+ grpc::string load_balanced_hostname_;
+ grpc::string load_key_;
+ uint64_t load_report_interval_ms_;
+
+ // The data for RPC communication with the load reportee.
+ ServerContext ctx_;
+ ::grpc::lb::v1::LoadReportRequest request_;
+
+ // The members passed down from LoadReporterAsyncServiceImpl.
+ ServerCompletionQueue* cq_;
+ LoadReporterAsyncServiceImpl* service_;
+ LoadReporter* load_reporter_;
+ ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
+ ::grpc::lb::v1::LoadReportRequest>
+ stream_;
+
+ // The status of the RPC progress.
+ enum CallStatus {
+ WAITING_FOR_DELIVERY,
+ DELIVERED,
+ INITIAL_REQUEST_RECEIVED,
+ INITIAL_RESPONSE_SENT,
+ FINISH_CALLED
+ } call_status_;
+ bool shutdown_{false};
+ bool done_notified_{false};
+ bool is_cancelled_{false};
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ CallableTag next_inbound_;
+ CallableTag next_outbound_;
+ std::unique_ptr<Alarm> next_report_alarm_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Work(void* arg);
+
+ // Schedules the next data fetching from Census and LB feedback sampling.
+ void ScheduleNextFetchAndSample();
+
+ // Fetches data from Census and samples LB feedback.
+ void FetchAndSample(bool ok);
+
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ // To synchronize the operations related to shutdown state of cq_, so that we
+ // don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
+ std::unique_ptr<LoadReporter> load_reporter_;
+ std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
new file mode 100644
index 0000000000..81cf6ac562
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void LoadReportingServiceServerBuilderOption::UpdateArguments(
+ ::grpc::ChannelArguments* args) {
+ args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true);
+}
+
+void LoadReportingServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) {
+ plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin());
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
new file mode 100644
index 0000000000..c2c5dd5ed5
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+#include <grpcpp/impl/server_initializer.h>
+
+namespace grpc {
+namespace load_reporter {
+
+bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_synchronous_methods();
+ }
+ return false;
+}
+
+bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_async_methods();
+ }
+ return false;
+}
+
+void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder(
+ grpc::ServerBuilder* builder) {
+ auto cq = builder->AddCompletionQueue();
+ service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq));
+}
+
+void LoadReportingServiceServerBuilderPlugin::InitServer(
+ grpc::ServerInitializer* si) {
+ si->RegisterService(service_);
+}
+
+void LoadReportingServiceServerBuilderPlugin::Finish(
+ grpc::ServerInitializer* si) {
+ service_->StartThread();
+ service_.reset();
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
new file mode 100644
index 0000000000..1f098591d4
--- /dev/null
+++ b/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/impl/server_builder_plugin.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The plugin that registers and starts load reporting service when starting a
+// server.
+class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin {
+ public:
+ ~LoadReportingServiceServerBuilderPlugin() override = default;
+ grpc::string name() override { return "load_reporting_service"; }
+
+ // Creates a load reporting service.
+ void UpdateServerBuilder(grpc::ServerBuilder* builder) override;
+
+ // Registers the load reporter service.
+ void InitServer(grpc::ServerInitializer* si) override;
+
+ // Starts the load reporter service.
+ void Finish(grpc::ServerInitializer* si) override;
+
+ void ChangeArguments(const grpc::string& name, void* value) override {}
+ void UpdateChannelArguments(grpc::ChannelArguments* args) override {}
+ bool has_sync_methods() const override;
+ bool has_async_methods() const override;
+
+ private:
+ std::shared_ptr<LoadReporterAsyncServiceImpl> service_;
+};
+
+std::unique_ptr<grpc::ServerBuilderPlugin>
+CreateLoadReportingServiceServerBuilderPlugin();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
diff --git a/src/cpp/server/load_reporter/util.cc b/src/cpp/server/load_reporter/util.cc
new file mode 100644
index 0000000000..a2f2f11e70
--- /dev/null
+++ b/src/cpp/server/load_reporter/util.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <grpc/support/log.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value) {
+ if (std::isnormal(cost_value)) {
+ grpc::string buf;
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/src/proto/grpc/lb/v1/load_reporter.proto b/src/proto/grpc/lb/v1/load_reporter.proto
index c2e4f23f6e..d57a37fed7 100644
--- a/src/proto/grpc/lb/v1/load_reporter.proto
+++ b/src/proto/grpc/lb/v1/load_reporter.proto
@@ -114,6 +114,10 @@ message Load {
// num_calls_in_progress is the only valid entry. If in_progress_report is not
// set, num_calls_in_progress will be ignored. If in_progress_report is set,
// fields other than num_calls_in_progress and orphaned_load will be ignored.
+ // TODO(juanlishen): A Load is either an in_progress_report or not. We should
+ // make this explicit in hierarchy. From the log, I see in_progress_report_
+ // has a random num_calls_in_progress_ when not set, which might lead to bug
+ // when the balancer process the load report.
oneof in_progress_report {
// The number of calls in progress (instantaneously) per load balancer id.
int64 num_calls_in_progress = 5;
diff --git a/templates/tools/dockerfile/test/bazel/Dockerfile.template b/templates/tools/dockerfile/test/bazel/Dockerfile.template
index 8ef2f02e71..50aa72edb3 100644
--- a/templates/tools/dockerfile/test/bazel/Dockerfile.template
+++ b/templates/tools/dockerfile/test/bazel/Dockerfile.template
@@ -28,6 +28,8 @@
openjdk-8-jdk ${'\\'}
vim
+ <%include file="../../python_deps.include"/>
+
<%include file="../../bazel.include"/>
RUN mkdir -p /var/local/jenkins
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 23dde69dd0..95bb7ed229 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -14,7 +14,7 @@
licenses(["notice"]) # Apache v2
-load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_cc_binary")
+load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
grpc_package(
name = "test/cpp/end2end",
@@ -430,6 +430,20 @@ grpc_cc_binary(
)
grpc_cc_test(
+ name = "server_load_reporting_end2end_test",
+ srcs = ["server_load_reporting_end2end_test.cc"],
+ external_deps = [
+ "gtest",
+ "gmock",
+ ],
+ deps = [
+ "//:grpcpp_server_load_reporting",
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/cpp/util:test_util",
+ ],
+)
+
+grpc_cc_test(
name = "shutdown_test",
srcs = ["shutdown_test.cc"],
external_deps = [
diff --git a/test/cpp/end2end/server_load_reporting_end2end_test.cc b/test/cpp/end2end/server_load_reporting_end2end_test.cc
new file mode 100644
index 0000000000..7bc9af2a6e
--- /dev/null
+++ b/test/cpp/end2end/server_load_reporting_end2end_test.cc
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <thread>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <grpc++/grpc++.h>
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpcpp/ext/server_load_reporting.h>
+#include <grpcpp/server_builder.h>
+
+#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+
+namespace grpc {
+namespace testing {
+namespace {
+
+constexpr double kMetricValue = 3.1415;
+constexpr char kMetricName[] = "METRIC_PI";
+
+// Different messages result in different response statuses. For simplicity in
+// computing request bytes, the message sizes should be the same.
+const char kOkMessage[] = "hello";
+const char kServerErrorMessage[] = "sverr";
+const char kClientErrorMessage[] = "clerr";
+
+class EchoTestServiceImpl : public EchoTestService::Service {
+ public:
+ ~EchoTestServiceImpl() override {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ if (request->message() == kServerErrorMessage) {
+ return Status(StatusCode::UNKNOWN, "Server error requested");
+ }
+ if (request->message() == kClientErrorMessage) {
+ return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
+ }
+ response->set_message(request->message());
+ ::grpc::load_reporter::experimental::AddLoadReportingCost(
+ context, kMetricName, kMetricValue);
+ return Status::OK;
+ }
+};
+
+class ServerLoadReportingEnd2endTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ server_address_ =
+ "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
+ server_ =
+ ServerBuilder()
+ .AddListeningPort(server_address_, InsecureServerCredentials())
+ .RegisterService(&echo_service_)
+ .SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(
+ new ::grpc::load_reporter::experimental::
+ LoadReportingServiceServerBuilderOption()))
+ .BuildAndStart();
+ server_thread_ =
+ std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
+ }
+
+ void RunServerLoop() { server_->Wait(); }
+
+ void TearDown() override {
+ server_->Shutdown();
+ server_thread_.join();
+ }
+
+ void ClientMakeEchoCalls(const grpc::string& lb_id,
+ const grpc::string& lb_tag,
+ const grpc::string& message, size_t num_requests) {
+ auto stub = EchoTestService::NewStub(
+ CreateChannel(server_address_, InsecureChannelCredentials()));
+ grpc::string lb_token = lb_id + lb_tag;
+ for (int i = 0; i < num_requests; ++i) {
+ ClientContext ctx;
+ if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message(message);
+ Status status = stub->Echo(&ctx, request, &response);
+ if (message == kOkMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::OK);
+ ASSERT_EQ(request.message(), response.message());
+ } else if (message == kServerErrorMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
+ } else if (message == kClientErrorMessage) {
+ ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
+ }
+ }
+ }
+
+ grpc::string server_address_;
+ std::unique_ptr<Server> server_;
+ std::thread server_thread_;
+ EchoTestServiceImpl echo_service_;
+};
+
+TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
+
+TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
+ auto channel =
+ grpc::CreateChannel(server_address_, InsecureChannelCredentials());
+ auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel);
+ ClientContext ctx;
+ auto stream = stub->ReportLoad(&ctx);
+ ::grpc::lb::v1::LoadReportRequest request;
+ request.mutable_initial_request()->set_load_balanced_hostname(
+ server_address_);
+ request.mutable_initial_request()->set_load_key("LOAD_KEY");
+ request.mutable_initial_request()
+ ->mutable_load_report_interval()
+ ->set_seconds(5);
+ stream->Write(request);
+ gpr_log(GPR_INFO, "Initial request sent.");
+ ::grpc::lb::v1::LoadReportResponse response;
+ stream->Read(&response);
+ const grpc::string& lb_id = response.initial_response().load_balancer_id();
+ gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
+ ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
+ while (true) {
+ stream->Read(&response);
+ if (!response.load().empty()) {
+ ASSERT_EQ(response.load().size(), 3);
+ for (const auto& load : response.load()) {
+ if (load.in_progress_report_case()) {
+ // The special load record that reports the number of in-progress
+ // calls.
+ ASSERT_EQ(load.num_calls_in_progress(), 1);
+ } else if (load.orphaned_load_case()) {
+ // The call from the balancer doesn't have any valid LB token.
+ ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
+ ASSERT_EQ(load.num_calls_started(), 1);
+ ASSERT_EQ(load.num_calls_finished_without_error(), 0);
+ ASSERT_EQ(load.num_calls_finished_with_error(), 0);
+ } else {
+ // This corresponds to the calls from the client.
+ ASSERT_EQ(load.num_calls_started(), 1);
+ ASSERT_EQ(load.num_calls_finished_without_error(), 1);
+ ASSERT_EQ(load.num_calls_finished_with_error(), 0);
+ ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
+ ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
+ ASSERT_EQ(load.metric_data().size(), 1);
+ ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
+ ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
+ 1);
+ ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
+ kMetricValue);
+ }
+ }
+ break;
+ }
+ }
+ stream->WritesDone();
+ ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
+}
+
+// TODO(juanlishen): Add more tests.
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/server/load_reporter/BUILD b/test/cpp/server/load_reporter/BUILD
index ebfcfbb348..b7c4d29d71 100644
--- a/test/cpp/server/load_reporter/BUILD
+++ b/test/cpp/server/load_reporter/BUILD
@@ -42,6 +42,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:lb_load_reporter",
+ "//:lb_server_load_reporting_filter",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
diff --git a/test/cpp/server/load_reporter/load_reporter_test.cc b/test/cpp/server/load_reporter/load_reporter_test.cc
index 3264dba134..719c3a67d9 100644
--- a/test/cpp/server/load_reporter/load_reporter_test.cc
+++ b/test/cpp/server/load_reporter/load_reporter_test.cc
@@ -25,6 +25,7 @@
#include <grpc/grpc.h>
#include <gtest/gtest.h>
+#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/cpp/server/load_reporter/constants.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
@@ -123,6 +124,14 @@ class LoadReporterTest : public ::testing::Test {
private:
void SetUp() override {
+ // Access the measures to make them valid.
+ ::grpc::load_reporter::MeasureStartCount();
+ ::grpc::load_reporter::MeasureEndCount();
+ ::grpc::load_reporter::MeasureEndBytesSent();
+ ::grpc::load_reporter::MeasureEndBytesReceived();
+ ::grpc::load_reporter::MeasureEndLatencyMs();
+ ::grpc::load_reporter::MeasureOtherCallMetric();
+ // Set up the load reporter.
auto mock_cpu = new MockCpuStatsProvider();
auto mock_census = new MockCensusViewProvider();
// Prepare the initial CPU stats data. Note that the expectation should be
diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc
index c8b0ac73f4..0cf75f1e5f 100644
--- a/test/cpp/util/test_credentials_provider.cc
+++ b/test/cpp/util/test_credentials_provider.cc
@@ -63,6 +63,8 @@ class DefaultCredentialsProvider : public CredentialsProvider {
SslCredentialsOptions ssl_opts = {test_root_cert, "", ""};
args->SetSslTargetNameOverride("foo.test.google.fr");
return SslCredentials(ssl_opts);
+ } else if (type == grpc::testing::kGoogleDefaultCredentialsType) {
+ return grpc::GoogleDefaultCredentials();
} else {
std::unique_lock<std::mutex> lock(mu_);
auto it(std::find(added_secure_type_names_.begin(),
diff --git a/test/cpp/util/test_credentials_provider.h b/test/cpp/util/test_credentials_provider.h
index b1d69e893d..0bc910dbc0 100644
--- a/test/cpp/util/test_credentials_provider.h
+++ b/test/cpp/util/test_credentials_provider.h
@@ -33,6 +33,7 @@ const char kInsecureCredentialsType[] = "INSECURE_CREDENTIALS";
// property "transport_security_type".
const char kTlsCredentialsType[] = "ssl";
const char kAltsCredentialsType[] = "alts";
+const char kGoogleDefaultCredentialsType[] = "google_default_credentials";
// Provide test credentials of a particular type.
class CredentialTypeProvider {
diff --git a/tools/dockerfile/test/bazel/Dockerfile b/tools/dockerfile/test/bazel/Dockerfile
index 1f7331132a..4f913dc396 100644
--- a/tools/dockerfile/test/bazel/Dockerfile
+++ b/tools/dockerfile/test/bazel/Dockerfile
@@ -26,6 +26,22 @@ RUN apt-get update && apt-get -y install \
openjdk-8-jdk \
vim
+#====================
+# Python dependencies
+
+# Install dependencies
+
+RUN apt-get update && apt-get install -y \
+ python-all-dev \
+ python3-all-dev \
+ python-pip
+
+# Install Python packages from PyPI
+RUN pip install --upgrade pip==10.0.1
+RUN pip install virtualenv
+RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 twisted==17.5.0
+
+
#========================
# Bazel installation
RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" > /etc/apt/sources.list.d/bazel.list
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index 7ac4ed4b3e..322ab5eb98 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -784,6 +784,7 @@ doc/http-grpc-status-mapping.md \
doc/http2-interop-test-descriptions.md \
doc/internationalization.md \
doc/interop-test-descriptions.md \
+doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/server-reflection.md \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index c328387efa..ba322a90a5 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -784,6 +784,7 @@ doc/http-grpc-status-mapping.md \
doc/http2-interop-test-descriptions.md \
doc/internationalization.md \
doc/interop-test-descriptions.md \
+doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/server-reflection.md \
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index a85c328b07..4899eee3ea 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -786,6 +786,7 @@ doc/http-grpc-status-mapping.md \
doc/http2-interop-test-descriptions.md \
doc/internationalization.md \
doc/interop-test-descriptions.md \
+doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/server-reflection.md \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 1a5dfb902b..576950934e 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -786,6 +786,7 @@ doc/http-grpc-status-mapping.md \
doc/http2-interop-test-descriptions.md \
doc/internationalization.md \
doc/interop-test-descriptions.md \
+doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/server-reflection.md \
diff --git a/tools/internal_ci/linux/grpc_python_bazel_test.cfg b/tools/internal_ci/linux/grpc_python_bazel_test.cfg
new file mode 100644
index 0000000000..feae924330
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_python_bazel_test.cfg
@@ -0,0 +1,23 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh"
+timeout_mins: 240
+env_vars {
+ key: "BAZEL_SCRIPT"
+ value: "tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh"
+}
diff --git a/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh
new file mode 100755
index 0000000000..4f98d0a93a
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Test full Bazel
+#
+# NOTE: No empty lines should appear in this file before igncr is set!
+set -ex -o igncr || set -ex
+
+mkdir -p /var/local/git
+git clone /var/local/jenkins/grpc /var/local/git/grpc
+(cd /var/local/jenkins/grpc/ && git submodule foreach 'cd /var/local/git/grpc \
+&& git submodule update --init --reference /var/local/jenkins/grpc/${name} \
+${name}')
+cd /var/local/git/grpc/test
+bazel test --spawn_strategy=standalone --genrule_strategy=standalone //src/python/...