aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD12
-rw-r--r--CMakeLists.txt12
-rw-r--r--Makefile52
-rw-r--r--README.md16
-rw-r--r--binding.gyp4
-rw-r--r--build.yaml34
-rw-r--r--build_config.rb2
-rw-r--r--config.m44
-rw-r--r--config.w324
-rw-r--r--gRPC-Core.podspec12
-rw-r--r--grpc.def1
-rw-r--r--grpc.gemspec8
-rw-r--r--grpc.gyp8
-rw-r--r--include/grpc++/alarm.h36
-rw-r--r--include/grpc++/server_builder.h7
-rw-r--r--include/grpc/grpc.h13
-rw-r--r--package.xml8
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.c (renamed from src/core/ext/filters/load_reporting/load_reporting_filter.c)6
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h (renamed from src/core/ext/filters/load_reporting/load_reporting_filter.h)11
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.c (renamed from src/core/ext/filters/load_reporting/load_reporting.c)17
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.h (renamed from src/core/ext/filters/load_reporting/load_reporting.h)7
-rw-r--r--src/core/lib/debug/stats_data.c76
-rw-r--r--src/core/lib/debug/stats_data.h14
-rw-r--r--src/core/lib/debug/stats_data.yaml47
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/timer.h4
-rw-r--r--src/core/lib/iomgr/timer_generic.c2
-rw-r--r--src/core/lib/iomgr/timer_uv.c2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c200
-rw-r--r--src/core/lib/security/transport/secure_endpoint.h11
-rw-r--r--src/core/lib/security/transport/security_handshaker.c35
-rw-r--r--src/core/lib/support/string.c3
-rw-r--r--src/core/lib/surface/alarm.c31
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/core/plugin_registry/grpc_cronet_plugin_registry.c8
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c8
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c8
-rw-r--r--src/core/tsi/fake_transport_security.c134
-rw-r--r--src/core/tsi/fake_transport_security.h5
-rw-r--r--src/core/tsi/transport_security_grpc.c25
-rw-r--r--src/core/tsi/transport_security_grpc.h19
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h9
-rw-r--r--test/core/end2end/fixtures/h2_load_reporting.c2
-rw-r--r--test/core/end2end/tests/load_reporting_hook.c4
-rw-r--r--test/core/security/secure_endpoint_test.c56
-rw-r--r--test/core/surface/alarm_test.c25
-rw-r--r--test/cpp/common/alarm_cpp_test.cc60
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc51
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc310
-rw-r--r--test/cpp/microbenchmarks/BUILD25
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc4
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc367
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc147
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc87
-rw-r--r--test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h396
-rw-r--r--test/cpp/microbenchmarks/fullstack_streaming_pump.h170
-rw-r--r--test/cpp/microbenchmarks/fullstack_unary_ping_pong.h116
-rw-r--r--test/cpp/qps/client_async.cc15
-rwxr-xr-xtools/codegen/core/gen_stats_data.py31
-rw-r--r--tools/doxygen/Doxyfile.core2
-rw-r--r--tools/doxygen/Doxyfile.core.internal10
-rw-r--r--tools/flakes/detect_flakes.py7
-rw-r--r--tools/internal_ci/linux/grpc_sanity_webhook_test.cfg30
-rwxr-xr-xtools/jenkins/run_performance_profile_daily.sh4
-rw-r--r--tools/run_tests/generated/sources_and_headers.json69
67 files changed, 1702 insertions, 1211 deletions
diff --git a/BUILD b/BUILD
index bc50a37a40..281375fc1d 100644
--- a/BUILD
+++ b/BUILD
@@ -843,7 +843,7 @@ grpc_cc_library(
"grpc_deadline_filter",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
+ "grpc_server_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -1087,14 +1087,14 @@ grpc_cc_library(
)
grpc_cc_library(
- name = "grpc_load_reporting",
+ name = "grpc_server_load_reporting",
srcs = [
- "src/core/ext/filters/load_reporting/load_reporting.c",
- "src/core/ext/filters/load_reporting/load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c",
],
hdrs = [
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h",
],
language = "c",
deps = [
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 428bdc7773..ba2ec3f38f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1196,8 +1196,8 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/ext/census/base_resources.c
src/core/ext/census/context.c
src/core/ext/census/gen/census.pb.c
@@ -1523,8 +1523,8 @@ add_library(grpc_cronet
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
src/core/ext/transport/chttp2/client/chttp2_connector.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/plugin_registry/grpc_cronet_plugin_registry.c
)
@@ -2331,8 +2331,8 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
diff --git a/Makefile b/Makefile
index 66c99f5dcf..4d77cc5038 100644
--- a/Makefile
+++ b/Makefile
@@ -410,7 +410,7 @@ E = @echo
Q = @
endif
-CORE_VERSION = 4.0.0-dev
+CORE_VERSION = 5.0.0-dev
CPP_VERSION = 1.7.0-dev
CSHARP_VERSION = 1.7.0-dev
@@ -460,7 +460,7 @@ SHARED_EXT_CORE = dll
SHARED_EXT_CPP = dll
SHARED_EXT_CSHARP = dll
SHARED_PREFIX =
-SHARED_VERSION_CORE = -4
+SHARED_VERSION_CORE = -5
SHARED_VERSION_CPP = -1
SHARED_VERSION_CSHARP = -1
else ifeq ($(SYSTEM),Darwin)
@@ -2628,7 +2628,7 @@ install-shared_c: shared_c strip-shared_c install-pkg-config_c
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgpr.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.5
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2637,7 +2637,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2646,7 +2646,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_cronet.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2655,7 +2655,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_unsecure.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2672,7 +2672,7 @@ install-shared_cxx: shared_cxx strip-shared_cxx install-shared_c install-pkg-con
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2681,7 +2681,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_cronet$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_cronet.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2690,7 +2690,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_error_details$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_error_details.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2699,7 +2699,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_reflection$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_reflection.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2708,7 +2708,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_unsecure.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2725,7 +2725,7 @@ install-shared_csharp: shared_csharp strip-shared_csharp
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext$(SHARED_VERSION_CSHARP)-dll.a $(prefix)/lib/libgrpc_csharp_ext.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2892,8 +2892,8 @@ $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGPR_OB
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.4 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.5 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so
endif
endif
@@ -3166,8 +3166,8 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/census/base_resources.c \
src/core/ext/census/context.c \
src/core/ext/census/gen/census.pb.c \
@@ -3261,8 +3261,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGRPC_
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so
endif
endif
@@ -3491,8 +3491,8 @@ LIBGRPC_CRONET_SRC = \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
src/core/ext/transport/chttp2/client/chttp2_connector.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/plugin_registry/grpc_cronet_plugin_registry.c \
PUBLIC_HEADERS_C += \
@@ -3557,8 +3557,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(L
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so
endif
endif
@@ -4261,8 +4261,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c \
@@ -4355,8 +4355,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so
endif
endif
diff --git a/README.md b/README.md
index 995f877219..61479f34aa 100644
--- a/README.md
+++ b/README.md
@@ -27,14 +27,14 @@ Libraries in different languages may be in different states of development. We a
| Language | Source | Status |
|-------------------------|-------------------------------------|---------|
-| Shared C [core library] | [src/core](src/core) | 1.0 |
-| C++ | [src/cpp](src/cpp) | 1.0 |
-| Ruby | [src/ruby](src/ruby) | 1.0 |
-| NodeJS | [src/node](src/node) | 1.0 |
-| Python | [src/python](src/python) | 1.0 |
-| PHP | [src/php](src/php) | 1.0 |
-| C# | [src/csharp](src/csharp) | 1.0 |
-| Objective-C | [src/objective-c](src/objective-c) | 1.0 |
+| Shared C [core library] | [src/core](src/core) | 1.6 |
+| C++ | [src/cpp](src/cpp) | 1.6 |
+| Ruby | [src/ruby](src/ruby) | 1.6 |
+| NodeJS | [src/node](src/node) | 1.6 |
+| Python | [src/python](src/python) | 1.6 |
+| PHP | [src/php](src/php) | 1.6 |
+| C# | [src/csharp](src/csharp) | 1.6 |
+| Objective-C | [src/objective-c](src/objective-c) | 1.6 |
Java source code is in the [grpc-java](http://github.com/grpc/grpc-java)
repository. Go source code is in the
diff --git a/binding.gyp b/binding.gyp
index 0110951305..06dc731935 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -893,8 +893,8 @@
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
diff --git a/build.yaml b/build.yaml
index fa013bac13..c694675e93 100644
--- a/build.yaml
+++ b/build.yaml
@@ -12,7 +12,7 @@ settings:
'#08': Use "-preN" suffixes to identify pre-release versions
'#09': Per-language overrides are possible with (eg) ruby_version tag here
'#10': See the expand_version.py for all the quirks here
- core_version: 4.0.0-dev
+ core_version: 5.0.0-dev
g_stands_for: gambit
version: 1.7.0-dev
filegroups:
@@ -590,16 +590,6 @@ filegroups:
uses:
- grpc_base
- grpc_client_channel
-- name: grpc_load_reporting
- headers:
- - src/core/ext/filters/load_reporting/load_reporting.h
- - src/core/ext/filters/load_reporting/load_reporting_filter.h
- src:
- - src/core/ext/filters/load_reporting/load_reporting.c
- - src/core/ext/filters/load_reporting/load_reporting_filter.c
- plugin: grpc_load_reporting_plugin
- uses:
- - grpc_base
- name: grpc_max_age_filter
headers:
- src/core/ext/filters/max_age/max_age_filter.h
@@ -712,6 +702,16 @@ filegroups:
- src/core/ext/filters/workarounds/workaround_utils.c
uses:
- grpc_base
+- name: grpc_server_load_reporting
+ headers:
+ - src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+ - src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
+ src:
+ - src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ - src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
+ plugin: grpc_server_load_reporting_plugin
+ uses:
+ - grpc_base
- name: grpc_test_util_base
build: test
headers:
@@ -1164,7 +1164,7 @@ libs:
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
- grpc_resolver_fake
- - grpc_load_reporting
+ - grpc_server_load_reporting
- grpc_secure
- census
- grpc_max_age_filter
@@ -1190,7 +1190,7 @@ libs:
- grpc_base
- grpc_transport_cronet_client_secure
- grpc_transport_chttp2_client_secure
- - grpc_load_reporting
+ - grpc_server_load_reporting
generate_plugin_registry: true
platforms:
- linux
@@ -1264,7 +1264,7 @@ libs:
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
- grpc_resolver_fake
- - grpc_load_reporting
+ - grpc_server_load_reporting
- grpc_lb_policy_grpclb
- grpc_lb_policy_pick_first
- grpc_lb_policy_round_robin
@@ -3621,6 +3621,8 @@ targets:
- name: bm_fullstack_streaming_ping_pong
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
src:
- test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
deps:
@@ -3646,6 +3648,8 @@ targets:
- name: bm_fullstack_streaming_pump
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_streaming_pump.h
src:
- test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
deps:
@@ -3697,6 +3701,8 @@ targets:
- name: bm_fullstack_unary_ping_pong
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
src:
- test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
deps:
diff --git a/build_config.rb b/build_config.rb
index 7159c6e509..3dc31d4ce3 100644
--- a/build_config.rb
+++ b/build_config.rb
@@ -13,5 +13,5 @@
# limitations under the License.
module GrpcBuildConfig
- CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-4.dll'
+ CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-5.dll'
end
diff --git a/config.m4 b/config.m4
index bc9f24b756..d52e37ca28 100644
--- a/config.m4
+++ b/config.m4
@@ -322,8 +322,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/census/base_resources.c \
src/core/ext/census/context.c \
src/core/ext/census/gen/census.pb.c \
diff --git a/config.w32 b/config.w32
index 1b1a82b1ae..92faad7a8f 100644
--- a/config.w32
+++ b/config.w32
@@ -299,8 +299,8 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper_fallback.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native\\dns_resolver.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.c " +
- "src\\core\\ext\\filters\\load_reporting\\load_reporting.c " +
- "src\\core\\ext\\filters\\load_reporting\\load_reporting_filter.c " +
+ "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.c " +
+ "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.c " +
"src\\core\\ext\\census\\base_resources.c " +
"src\\core\\ext\\census\\context.c " +
"src\\core\\ext\\census\\gen\\census.pb.c " +
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index fdab4b3f81..2f1f415866 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -443,8 +443,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
- 'src/core/ext/filters/load_reporting/load_reporting.h',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h',
'src/core/ext/census/aggregation.h',
'src/core/ext/census/base_resources.h',
'src/core/ext/census/census_interface.h',
@@ -701,8 +701,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
@@ -938,8 +938,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
- 'src/core/ext/filters/load_reporting/load_reporting.h',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h',
'src/core/ext/census/aggregation.h',
'src/core/ext/census/base_resources.h',
'src/core/ext/census/census_interface.h',
diff --git a/grpc.def b/grpc.def
index 37cc7e20f0..a7a11601ed 100644
--- a/grpc.def
+++ b/grpc.def
@@ -65,6 +65,7 @@ EXPORTS
grpc_completion_queue_shutdown
grpc_completion_queue_destroy
grpc_alarm_create
+ grpc_alarm_set
grpc_alarm_cancel
grpc_alarm_destroy
grpc_channel_check_connectivity_state
diff --git a/grpc.gemspec b/grpc.gemspec
index c441107643..2d0f9fd450 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -379,8 +379,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting.h )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.h )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.h )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.h )
s.files += %w( src/core/ext/census/aggregation.h )
s.files += %w( src/core/ext/census/base_resources.h )
s.files += %w( src/core/ext/census/census_interface.h )
@@ -640,8 +640,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting.c )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.c )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.c )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.c )
s.files += %w( src/core/ext/census/base_resources.c )
s.files += %w( src/core/ext/census/context.c )
s.files += %w( src/core/ext/census/gen/census.pb.c )
diff --git a/grpc.gyp b/grpc.gyp
index 0e7ee7e5c9..48fb2ba7cf 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -459,8 +459,8 @@
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
@@ -1108,8 +1108,8 @@
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c',
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h
index ed8dacbc94..2d88d868e5 100644
--- a/include/grpc++/alarm.h
+++ b/include/grpc++/alarm.h
@@ -37,20 +37,33 @@ class CompletionQueue;
/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
class Alarm : private GrpcLibraryCodegen {
public:
- /// Create a completion queue alarm instance associated to \a cq.
- ///
- /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
- /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
- /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ /// Create an unset completion queue alarm
+ Alarm() : tag_(nullptr), alarm_(grpc_alarm_create(nullptr)) {}
+
+ /// DEPRECATED: Create and set a completion queue alarm instance associated to
+ /// \a cq.
+ /// This form is deprecated because it is inherently racy.
/// \internal We rely on the presence of \a cq for grpc initialization. If \a
/// cq were ever to be removed, a reference to a static
/// internal::GrpcLibraryInitializer instance would need to be introduced
/// here. \endinternal.
template <typename T>
Alarm(CompletionQueue* cq, const T& deadline, void* tag)
- : tag_(tag),
- alarm_(grpc_alarm_create(cq->cq(), TimePoint<T>(deadline).raw_time(),
- static_cast<void*>(&tag_))) {}
+ : tag_(tag), alarm_(grpc_alarm_create(nullptr)) {
+ grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(),
+ static_cast<void*>(&tag_), nullptr);
+ }
+
+ /// Trigger an alarm instance on completion queue \a cq at the specified time.
+ /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
+ /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
+ /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ template <typename T>
+ void Set(CompletionQueue* cq, const T& deadline, void* tag) {
+ tag_.Set(tag);
+ grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(),
+ static_cast<void*>(&tag_), nullptr);
+ }
/// Alarms aren't copyable.
Alarm(const Alarm&) = delete;
@@ -69,17 +82,20 @@ class Alarm : private GrpcLibraryCodegen {
/// Destroy the given completion queue alarm, cancelling it in the process.
~Alarm() {
- if (alarm_ != nullptr) grpc_alarm_destroy(alarm_);
+ if (alarm_ != nullptr) grpc_alarm_destroy(alarm_, nullptr);
}
/// Cancel a completion queue alarm. Calling this function over an alarm that
/// has already fired has no effect.
- void Cancel() { grpc_alarm_cancel(alarm_); }
+ void Cancel() {
+ if (alarm_ != nullptr) grpc_alarm_cancel(alarm_, nullptr);
+ }
private:
class AlarmEntry : public CompletionQueueTag {
public:
AlarmEntry(void* tag) : tag_(tag) {}
+ void Set(void* tag) { tag_ = tag; }
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
return true;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index eafd63619d..21ae70d13a 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -151,7 +151,8 @@ class ServerBuilder {
/// Add a completion queue for handling asynchronous services.
///
/// Caller is required to shutdown the server prior to shutting down the
- /// returned completion queue. A typical usage scenario:
+ /// returned completion queue. Caller is also required to drain the
+ /// completion queue after shutting it down. A typical usage scenario:
///
/// // While building the server:
/// ServerBuilder builder;
@@ -162,6 +163,10 @@ class ServerBuilder {
/// // While shutting down the server;
/// server_->Shutdown();
/// cq_->Shutdown(); // Always *after* the associated server's Shutdown()!
+ /// // Drain the cq_ that was created
+ /// void* ignored_tag;
+ /// bool ignored_ok;
+ /// while (cq_->Next(&ignored_tag, &ignored_ok)) { }
///
/// \param is_frequently_polled This is an optional parameter to inform gRPC
/// library about whether this completion queue would be frequently polled
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index b562167b5f..fab7d438aa 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -143,21 +143,24 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
drained and no threads are executing grpc_completion_queue_next */
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq);
-/** Create a completion queue alarm instance associated to \a cq.
+/** Create a completion queue alarm instance */
+GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved);
+
+/** Set a completion queue alarm instance associated to \a cq.
*
* Once the alarm expires (at \a deadline) or it's cancelled (see \a
* grpc_alarm_cancel), an event with tag \a tag will be added to \a cq. If the
* alarm expired, the event's success bit will be true, false otherwise (ie,
* upon cancellation). */
-GRPCAPI grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq,
- gpr_timespec deadline, void *tag);
+GRPCAPI void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag, void *reserved);
/** Cancel a completion queue alarm. Calling this function over an alarm that
* has already fired has no effect. */
-GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm);
+GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved);
/** Destroy the given completion queue alarm, cancelling it in the process. */
-GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm);
+GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved);
/** Check the connectivity state of a channel. */
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(
diff --git a/package.xml b/package.xml
index 4b1f42dd02..b7c0e679e5 100644
--- a/package.xml
+++ b/package.xml
@@ -389,8 +389,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.h" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/aggregation.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/base_resources.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/census_interface.h" role="src" />
@@ -650,8 +650,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.c" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/base_resources.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/context.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/gen/census.pb.c" role="src" />
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c
index 17e946937f..7b8cf986f7 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c
@@ -24,8 +24,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -213,7 +213,7 @@ static void lr_start_transport_stream_op_batch(
GPR_TIMER_END("lr_start_transport_stream_op_batch", 0);
}
-const grpc_channel_filter grpc_load_reporting_filter = {
+const grpc_channel_filter grpc_server_load_reporting_filter = {
lr_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index 1a5424e43a..9527868c9f 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -16,12 +16,13 @@
*
*/
-#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
-#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_stack.h"
-extern const grpc_channel_filter grpc_load_reporting_filter;
+extern const grpc_channel_filter grpc_server_load_reporting_filter;
-#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \
+ */
diff --git a/src/core/ext/filters/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
index b42aa99cdb..199cb883b3 100644
--- a/src/core/ext/filters/load_reporting/load_reporting.c
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
@@ -25,8 +25,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
@@ -37,9 +37,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) {
grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
}
-static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg) {
+static bool maybe_add_server_load_reporting_filter(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_channel_filter *filter = arg;
@@ -61,10 +60,10 @@ grpc_arg grpc_load_reporting_enable_arg() {
/* Plugin registration */
-void grpc_load_reporting_plugin_init(void) {
+void grpc_server_load_reporting_plugin_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_load_reporting_filter,
- (void *)&grpc_load_reporting_filter);
+ maybe_add_server_load_reporting_filter,
+ (void *)&grpc_server_load_reporting_filter);
}
-void grpc_load_reporting_plugin_shutdown() {}
+void grpc_server_load_reporting_plugin_shutdown() {}
diff --git a/src/core/ext/filters/load_reporting/load_reporting.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
index fc04d2826a..65a6d0900e 100644
--- a/src/core/ext/filters/load_reporting/load_reporting.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
-#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
#include <grpc/impl/codegen/grpc_types.h>
@@ -55,4 +55,5 @@ typedef struct grpc_load_reporting_call_data {
/** Return a \a grpc_arg enabling load reporting */
grpc_arg grpc_load_reporting_enable_arg();
-#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \
+ */
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index a21bb1ef40..5847c96a34 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -56,9 +56,58 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_queue_drained",
"executor_push_retries",
};
+const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
+ "Number of client side calls created by this process",
+ "Number of server side calls created by this process",
+ "Number of polling syscalls (epoll_wait, poll, etc) made by this process",
+ "Number of sleeping syscalls made by this process",
+ "Number of times histogram increments went through the slow (binary "
+ "search) path",
+ "Number of write syscalls (or equivalent - eg sendmsg) made by this "
+ "process",
+ "Number of read syscalls (or equivalent - eg recvmsg) made by this process",
+ "Number of times a backup poller has been created (this can be expensive)",
+ "Number of polls performed on the backup poller",
+ "Number of batches received by HTTP2 transport",
+ "Number of cancelations received by HTTP2 transport",
+ "Number of batches containing send initial metadata",
+ "Number of batches containing send message",
+ "Number of batches containing send trailing metadata",
+ "Number of batches containing receive initial metadata",
+ "Number of batches containing receive message",
+ "Number of batches containing receive trailing metadata",
+ "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated",
+ "Number of HTTP2 writes offloaded to the executor from application threads",
+ "Number of HTTP2 writes that finished seeing more data needed to be "
+ "written",
+ "Number of HTTP2 writes that were made knowing there was still more data "
+ "to be written (we cap maximum write size to syscall_write)",
+ "Number of combiner lock entries by process (first items queued to a "
+ "combiner)",
+ "Number of items scheduled against combiner locks",
+ "Number of final items scheduled against combiner locks",
+ "Number of combiner locks offloaded to different threads",
+ "Number of finite runtime closures scheduled against the executor (gRPC "
+ "thread pool)",
+ "Number of potentially infinite runtime closures scheduled against the "
+ "executor (gRPC thread pool)",
+ "Number of closures scheduled by the executor to the executor",
+ "Number of thread wakeups initiated within the executor",
+ "Number of times an executor queue was drained",
+ "Number of times we raced and were forced to retry pushing a closure to "
+ "the executor",
+};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
- "tcp_write_size", "tcp_write_iov_size", "tcp_read_size",
- "tcp_read_offer", "tcp_read_iov_size", "http2_send_message_size",
+ "tcp_write_size", "tcp_write_iov_size", "tcp_read_size",
+ "tcp_read_offer", "tcp_read_offer_iov_size", "http2_send_message_size",
+};
+const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
+ "Number of bytes offered to each syscall_write",
+ "Number of byte segments offered to each syscall_write",
+ "Number of bytes received by each syscall_read",
+ "Number of bytes offered to each syscall_read",
+ "Number of byte segments offered to each syscall_read",
+ "Size of messages received by HTTP2 transport",
};
const int grpc_stats_table_0[65] = {
0, 1, 2, 3, 4, 6, 8, 11,
@@ -189,11 +238,12 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_0, 64));
}
-void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
+void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx,
+ int value) {
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
- value);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, value);
return;
}
union {
@@ -206,11 +256,12 @@ void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
_bkt.dbl = grpc_stats_table_2[bucket];
bucket -= (_val.uint < _bkt.uint);
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
- bucket);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, bucket);
return;
}
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
+ GRPC_STATS_INC_HISTOGRAM((exec_ctx),
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE,
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_2, 64));
}
@@ -247,6 +298,9 @@ const int *const grpc_stats_histo_bucket_boundaries[6] = {
grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0,
grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0};
void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, int x) = {
- grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size,
- grpc_stats_inc_tcp_read_size, grpc_stats_inc_tcp_read_offer,
- grpc_stats_inc_tcp_read_iov_size, grpc_stats_inc_http2_send_message_size};
+ grpc_stats_inc_tcp_write_size,
+ grpc_stats_inc_tcp_write_iov_size,
+ grpc_stats_inc_tcp_read_size,
+ grpc_stats_inc_tcp_read_offer,
+ grpc_stats_inc_tcp_read_offer_iov_size,
+ grpc_stats_inc_http2_send_message_size};
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index bbc78fb341..2b8bcf8221 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -60,16 +60,18 @@ typedef enum {
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
+extern const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT];
typedef enum {
GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE,
GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE,
GRPC_STATS_HISTOGRAM_TCP_READ_SIZE,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE,
GRPC_STATS_HISTOGRAM_COUNT
} grpc_stats_histograms;
extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT];
+extern const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT];
typedef enum {
GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_FIRST_SLOT = 0,
GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_BUCKETS = 64,
@@ -79,8 +81,8 @@ typedef enum {
GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_FIRST_SLOT = 192,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_BUCKETS = 64,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_FIRST_SLOT = 256,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_FIRST_SLOT = 256,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 320,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_BUCKETS = 384
@@ -174,9 +176,9 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int x);
#define GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, value) \
grpc_stats_inc_tcp_read_offer((exec_ctx), (int)(value))
void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int x);
-#define GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, value) \
- grpc_stats_inc_tcp_read_iov_size((exec_ctx), (int)(value))
-void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int x);
+#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, value) \
+ grpc_stats_inc_tcp_read_offer_iov_size((exec_ctx), (int)(value))
+void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x);
#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \
grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value))
void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x);
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index e74fdc331d..ecceb7d493 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -17,58 +17,103 @@
# overall
- counter: client_calls_created
+ doc: Number of client side calls created by this process
- counter: server_calls_created
+ doc: Number of server side calls created by this process
# polling
- counter: syscall_poll
+ doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process
- counter: syscall_wait
+ doc: Number of sleeping syscalls made by this process
# stats system
- counter: histogram_slow_lookups
+ doc: Number of times histogram increments went through the slow
+ (binary search) path
# tcp
- counter: syscall_write
+ doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process
- counter: syscall_read
+ doc: Number of read syscalls (or equivalent - eg recvmsg) made by this process
- histogram: tcp_write_size
max: 16777216 # 16 meg max write tracked
buckets: 64
+ doc: Number of bytes offered to each syscall_write
- histogram: tcp_write_iov_size
max: 1024
buckets: 64
+ doc: Number of byte segments offered to each syscall_write
- histogram: tcp_read_size
max: 16777216
buckets: 64
+ doc: Number of bytes received by each syscall_read
- histogram: tcp_read_offer
max: 16777216
buckets: 64
-- histogram: tcp_read_iov_size
+ doc: Number of bytes offered to each syscall_read
+- histogram: tcp_read_offer_iov_size
max: 1024
buckets: 64
+ doc: Number of byte segments offered to each syscall_read
- counter: tcp_backup_pollers_created
+ doc: Number of times a backup poller has been created (this can be expensive)
- counter: tcp_backup_poller_polls
+ doc: Number of polls performed on the backup poller
# chttp2
- counter: http2_op_batches
+ doc: Number of batches received by HTTP2 transport
- counter: http2_op_cancel
+ doc: Number of cancelations received by HTTP2 transport
- counter: http2_op_send_initial_metadata
+ doc: Number of batches containing send initial metadata
- counter: http2_op_send_message
+ doc: Number of batches containing send message
- counter: http2_op_send_trailing_metadata
+ doc: Number of batches containing send trailing metadata
- counter: http2_op_recv_initial_metadata
+ doc: Number of batches containing receive initial metadata
- counter: http2_op_recv_message
+ doc: Number of batches containing receive message
- counter: http2_op_recv_trailing_metadata
+ doc: Number of batches containing receive trailing metadata
- histogram: http2_send_message_size
max: 16777216
buckets: 64
+ doc: Size of messages received by HTTP2 transport
- counter: http2_pings_sent
+ doc: Number of HTTP2 pings sent by process
- counter: http2_writes_begun
+ doc: Number of HTTP2 writes initiated
- counter: http2_writes_offloaded
+ doc: Number of HTTP2 writes offloaded to the executor from application threads
- counter: http2_writes_continued
+ doc: Number of HTTP2 writes that finished seeing more data needed to be
+ written
- counter: http2_partial_writes
+ doc: Number of HTTP2 writes that were made knowing there was still more data
+ to be written (we cap maximum write size to syscall_write)
# combiner locks
- counter: combiner_locks_initiated
+ doc: Number of combiner lock entries by process
+ (first items queued to a combiner)
- counter: combiner_locks_scheduled_items
+ doc: Number of items scheduled against combiner locks
- counter: combiner_locks_scheduled_final_items
+ doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
+ doc: Number of combiner locks offloaded to different threads
# executor
- counter: executor_scheduled_short_items
+ doc: Number of finite runtime closures scheduled against the executor
+ (gRPC thread pool)
- counter: executor_scheduled_long_items
+ doc: Number of potentially infinite runtime closures scheduled against the
+ executor (gRPC thread pool)
- counter: executor_scheduled_to_self
+ doc: Number of closures scheduled by the executor to the executor
- counter: executor_wakeup_initiated
+ doc: Number of thread wakeups initiated within the executor
- counter: executor_queue_drained
+ doc: Number of times an executor queue was drained
- counter: executor_push_retries
+ doc: Number of times we raced and were forced to retry pushing a closure to
+ the executor
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index fba2bc017b..6c56c36bcd 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -399,7 +399,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_flags = 0;
GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length);
- GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count);
+ GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count);
GPR_TIMER_BEGIN("recvmsg", 0);
do {
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index b92b8fb8b8..ac392f87fe 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -44,6 +44,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now);
+/* Initialize *timer without setting it. This can later be passed through
+ the regular init or cancel */
+void grpc_timer_init_unset(grpc_timer *timer);
+
/* Note that there is no timer destroy function. This is because the
timer is a one-time occurrence with a guarantee that the callback will
be called exactly once, either at expiration or cancellation. Thus, all
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 12efce241f..c08bb525b7 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -234,6 +234,8 @@ static void note_deadline_change(timer_shard *shard) {
}
}
+void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; }
+
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 70f49bcbe8..adced41f53 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -77,6 +77,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
uv_unref((uv_handle_t *)uv_timer);
}
+void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = 0; }
+
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
GRPC_UV_ASSERT_SAME_THREAD();
if (timer->pending) {
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 5e41b94ff8..ae5633b82c 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -34,7 +34,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-#include "src/core/tsi/transport_security_interface.h"
+#include "src/core/tsi/transport_security_grpc.h"
#define STAGING_BUFFER_SIZE 8192
@@ -42,6 +42,7 @@ typedef struct {
grpc_endpoint base;
grpc_endpoint *wrapped_ep;
struct tsi_frame_protector *protector;
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
grpc_closure *read_cb;
@@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
+ tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector);
grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes);
grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer);
grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer);
@@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
return;
}
- /* TODO(yangg) check error, maybe bail out early */
- for (i = 0; i < ep->source_buffer.count; i++) {
- grpc_slice encrypted = ep->source_buffer.slices[i];
- uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
- size_t message_size = GRPC_SLICE_LENGTH(encrypted);
-
- while (message_size > 0 || keep_looping) {
- size_t unprotected_buffer_size_written = (size_t)(end - cur);
- size_t processed_message_size = message_size;
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_unprotect(ep->protector, message_bytes,
- &processed_message_size, cur,
- &unprotected_buffer_size_written);
- gpr_mu_unlock(&ep->protector_mu);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Decryption error: %s",
- tsi_result_to_string(result));
- break;
- }
- message_bytes += processed_message_size;
- message_size -= processed_message_size;
- cur += unprotected_buffer_size_written;
-
- if (cur == end) {
- flush_read_staging_buffer(ep, &cur, &end);
- /* Force to enter the loop again to extract buffered bytes in protector.
- The bytes could be buffered because of running out of staging_buffer.
- If this happens at the end of all slices, doing another unprotect
- avoids leaving data in the protector. */
- keep_looping = 1;
- } else if (unprotected_buffer_size_written > 0) {
- keep_looping = 1;
- } else {
- keep_looping = 0;
+ if (ep->zero_copy_protector != NULL) {
+ // Use zero-copy grpc protector to unprotect.
+ result = tsi_zero_copy_grpc_protector_unprotect(
+ exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
+ } else {
+ // Use frame protector to unprotect.
+ /* TODO(yangg) check error, maybe bail out early */
+ for (i = 0; i < ep->source_buffer.count; i++) {
+ grpc_slice encrypted = ep->source_buffer.slices[i];
+ uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
+ size_t message_size = GRPC_SLICE_LENGTH(encrypted);
+
+ while (message_size > 0 || keep_looping) {
+ size_t unprotected_buffer_size_written = (size_t)(end - cur);
+ size_t processed_message_size = message_size;
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_unprotect(
+ ep->protector, message_bytes, &processed_message_size, cur,
+ &unprotected_buffer_size_written);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Decryption error: %s",
+ tsi_result_to_string(result));
+ break;
+ }
+ message_bytes += processed_message_size;
+ message_size -= processed_message_size;
+ cur += unprotected_buffer_size_written;
+
+ if (cur == end) {
+ flush_read_staging_buffer(ep, &cur, &end);
+ /* Force to enter the loop again to extract buffered bytes in
+ protector. The bytes could be buffered because of running out of
+ staging_buffer. If this happens at the end of all slices, doing
+ another unprotect avoids leaving data in the protector. */
+ keep_looping = 1;
+ } else if (unprotected_buffer_size_written > 0) {
+ keep_looping = 1;
+ } else {
+ keep_looping = 0;
+ }
}
+ if (result != TSI_OK) break;
}
- if (result != TSI_OK) break;
- }
- if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
- grpc_slice_buffer_add(
- ep->read_buffer,
- grpc_slice_split_head(
- &ep->read_staging_buffer,
- (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
+ if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
+ grpc_slice_buffer_add(
+ ep->read_buffer,
+ grpc_slice_split_head(
+ &ep->read_staging_buffer,
+ (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
+ }
}
/* TODO(yangg) experiment with moving this block after read_cb to see if it
@@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
}
}
- for (i = 0; i < slices->count; i++) {
- grpc_slice plain = slices->slices[i];
- uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
- size_t message_size = GRPC_SLICE_LENGTH(plain);
- while (message_size > 0) {
- size_t protected_buffer_size_to_send = (size_t)(end - cur);
- size_t processed_message_size = message_size;
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_protect(ep->protector, message_bytes,
- &processed_message_size, cur,
- &protected_buffer_size_to_send);
- gpr_mu_unlock(&ep->protector_mu);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Encryption error: %s",
- tsi_result_to_string(result));
- break;
- }
- message_bytes += processed_message_size;
- message_size -= processed_message_size;
- cur += protected_buffer_size_to_send;
-
- if (cur == end) {
- flush_write_staging_buffer(ep, &cur, &end);
+ if (ep->zero_copy_protector != NULL) {
+ // Use zero-copy grpc protector to protect.
+ result = tsi_zero_copy_grpc_protector_protect(
+ exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer);
+ } else {
+ // Use frame protector to protect.
+ for (i = 0; i < slices->count; i++) {
+ grpc_slice plain = slices->slices[i];
+ uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
+ size_t message_size = GRPC_SLICE_LENGTH(plain);
+ while (message_size > 0) {
+ size_t protected_buffer_size_to_send = (size_t)(end - cur);
+ size_t processed_message_size = message_size;
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_protect(ep->protector, message_bytes,
+ &processed_message_size, cur,
+ &protected_buffer_size_to_send);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Encryption error: %s",
+ tsi_result_to_string(result));
+ break;
+ }
+ message_bytes += processed_message_size;
+ message_size -= processed_message_size;
+ cur += protected_buffer_size_to_send;
+
+ if (cur == end) {
+ flush_write_staging_buffer(ep, &cur, &end);
+ }
}
- }
- if (result != TSI_OK) break;
- }
- if (result == TSI_OK) {
- size_t still_pending_size;
- do {
- size_t protected_buffer_size_to_send = (size_t)(end - cur);
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_protect_flush(ep->protector, cur,
- &protected_buffer_size_to_send,
- &still_pending_size);
- gpr_mu_unlock(&ep->protector_mu);
if (result != TSI_OK) break;
- cur += protected_buffer_size_to_send;
- if (cur == end) {
- flush_write_staging_buffer(ep, &cur, &end);
+ }
+ if (result == TSI_OK) {
+ size_t still_pending_size;
+ do {
+ size_t protected_buffer_size_to_send = (size_t)(end - cur);
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_protect_flush(
+ ep->protector, cur, &protected_buffer_size_to_send,
+ &still_pending_size);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) break;
+ cur += protected_buffer_size_to_send;
+ if (cur == end) {
+ flush_write_staging_buffer(ep, &cur, &end);
+ }
+ } while (still_pending_size > 0);
+ if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
+ grpc_slice_buffer_add(
+ &ep->output_buffer,
+ grpc_slice_split_head(
+ &ep->write_staging_buffer,
+ (size_t)(cur -
+ GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
}
- } while (still_pending_size > 0);
- if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
- grpc_slice_buffer_add(
- &ep->output_buffer,
- grpc_slice_split_head(
- &ep->write_staging_buffer,
- (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
}
}
@@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_get_fd};
grpc_endpoint *grpc_secure_endpoint_create(
- struct tsi_frame_protector *protector, grpc_endpoint *transport,
- grpc_slice *leftover_slices, size_t leftover_nslices) {
+ struct tsi_frame_protector *protector,
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector,
+ grpc_endpoint *transport, grpc_slice *leftover_slices,
+ size_t leftover_nslices) {
size_t i;
secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint));
ep->base.vtable = &vtable;
ep->wrapped_ep = transport;
ep->protector = protector;
+ ep->zero_copy_protector = zero_copy_protector;
grpc_slice_buffer_init(&ep->leftover_bytes);
for (i = 0; i < leftover_nslices; i++) {
grpc_slice_buffer_add(&ep->leftover_bytes,
diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h
index 1c5555f3df..3323a6ff42 100644
--- a/src/core/lib/security/transport/secure_endpoint.h
+++ b/src/core/lib/security/transport/secure_endpoint.h
@@ -23,12 +23,17 @@
#include "src/core/lib/iomgr/endpoint.h"
struct tsi_frame_protector;
+struct tsi_zero_copy_grpc_protector;
extern grpc_tracer_flag grpc_trace_secure_endpoint;
-/* Takes ownership of protector and to_wrap, and refs leftover_slices. */
+/* Takes ownership of protector, zero_copy_protector, and to_wrap, and refs
+ * leftover_slices. If zero_copy_protector is not NULL, protector will never be
+ * used. */
grpc_endpoint *grpc_secure_endpoint_create(
- struct tsi_frame_protector *protector, grpc_endpoint *to_wrap,
- grpc_slice *leftover_slices, size_t leftover_nslices);
+ struct tsi_frame_protector *protector,
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector,
+ grpc_endpoint *to_wrap, grpc_slice *leftover_slices,
+ size_t leftover_nslices);
#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index bf7a5272cb..1a1313d472 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -32,6 +32,7 @@
#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/tsi/transport_security_grpc.h"
#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
@@ -133,17 +134,31 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx,
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
return;
}
- // Create frame protector.
- tsi_frame_protector *protector;
- tsi_result result = tsi_handshaker_result_create_frame_protector(
- h->handshaker_result, NULL, &protector);
- if (result != TSI_OK) {
+ // Create zero-copy frame protector, if implemented.
+ tsi_zero_copy_grpc_protector *zero_copy_protector = NULL;
+ tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector(
+ h->handshaker_result, NULL, &zero_copy_protector);
+ if (result != TSI_OK && result != TSI_UNIMPLEMENTED) {
error = grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"),
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Zero-copy frame protector creation failed"),
result);
security_handshake_failed_locked(exec_ctx, h, error);
return;
}
+ // Create frame protector if zero-copy frame protector is NULL.
+ tsi_frame_protector *protector = NULL;
+ if (zero_copy_protector == NULL) {
+ result = tsi_handshaker_result_create_frame_protector(h->handshaker_result,
+ NULL, &protector);
+ if (result != TSI_OK) {
+ error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Frame protector creation failed"),
+ result);
+ security_handshake_failed_locked(exec_ctx, h, error);
+ goto done;
+ }
+ }
// Get unused bytes.
const unsigned char *unused_bytes = NULL;
size_t unused_bytes_size = 0;
@@ -153,12 +168,12 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx,
if (unused_bytes_size > 0) {
grpc_slice slice =
grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size);
- h->args->endpoint =
- grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1);
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, zero_copy_protector, h->args->endpoint, &slice, 1);
grpc_slice_unref_internal(exec_ctx, slice);
} else {
- h->args->endpoint =
- grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0);
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, zero_copy_protector, h->args->endpoint, NULL, 0);
}
tsi_handshaker_result_destroy(h->handshaker_result);
h->handshaker_result = NULL;
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index ec93303024..523e43445b 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -300,11 +300,12 @@ void *gpr_memrchr(const void *s, int c, size_t n) {
}
bool gpr_is_true(const char *s) {
+ size_t i;
if (s == NULL) {
return false;
}
static const char *truthy[] = {"yes", "true", "1"};
- for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ for (i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == gpr_stricmp(s, truthy[i])) {
return true;
}
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 7d60b1de17..5dbfaa2d43 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
static void alarm_unref(grpc_alarm *alarm) {
if (gpr_unref(&alarm->refs)) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ if (alarm->cq != NULL) {
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ }
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(alarm);
}
@@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
(void *)alarm, &alarm->completion);
}
-grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
- void *tag) {
+grpc_alarm *grpc_alarm_create(void *reserved) {
grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
- gpr_ref_init(&alarm->refs, 1);
#ifndef NDEBUG
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
@@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
}
#endif
+ gpr_ref_init(&alarm->refs, 1);
+ grpc_timer_init_unset(&alarm->alarm);
+ alarm->cq = NULL;
+ GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
+ grpc_schedule_on_exec_ctx);
+ return alarm;
+}
+
+void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag, void *reserved) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
- GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
- grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx_finish(&exec_ctx);
- return alarm;
}
-void grpc_alarm_cancel(grpc_alarm *alarm) {
+void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_timer_cancel(&exec_ctx, &alarm->alarm);
grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_alarm_destroy(grpc_alarm *alarm) {
- grpc_alarm_cancel(alarm);
+void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) {
+ grpc_alarm_cancel(alarm, reserved);
GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 96c16105e7..fd6ea4daa9 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -21,6 +21,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "4.0.0-dev"; }
+const char *grpc_version_string(void) { return "5.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gambit"; }
diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
index 322ebea111..1c09f54ad9 100644
--- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
@@ -28,8 +28,8 @@ extern void grpc_client_channel_init(void);
extern void grpc_client_channel_shutdown(void);
extern void grpc_tsi_gts_init(void);
extern void grpc_tsi_gts_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_http_filters_init,
@@ -42,6 +42,6 @@ void grpc_register_built_in_plugins(void) {
grpc_client_channel_shutdown);
grpc_register_plugin(grpc_tsi_gts_init,
grpc_tsi_gts_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
}
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index fa9974952c..9cacf3d306 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -44,8 +44,8 @@ extern void grpc_resolver_dns_native_init(void);
extern void grpc_resolver_dns_native_shutdown(void);
extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
extern void census_grpc_plugin_init(void);
extern void census_grpc_plugin_shutdown(void);
extern void grpc_max_age_filter_init(void);
@@ -82,8 +82,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(census_grpc_plugin_init,
census_grpc_plugin_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index 7eb599d81a..7b90d796d5 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -36,8 +36,8 @@ extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
extern void grpc_resolver_fake_init(void);
extern void grpc_resolver_fake_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
@@ -72,8 +72,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_resolver_fake_init,
grpc_resolver_fake_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 967126ecee..e7b3be3d86 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -25,7 +25,8 @@
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/useful.h>
-#include "src/core/tsi/transport_security.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/tsi/transport_security_grpc.h"
/* --- Constants. ---*/
#define TSI_FAKE_FRAME_HEADER_SIZE 4
@@ -74,6 +75,14 @@ typedef struct {
size_t max_frame_size;
} tsi_fake_frame_protector;
+typedef struct {
+ tsi_zero_copy_grpc_protector base;
+ grpc_slice_buffer header_sb;
+ grpc_slice_buffer protected_sb;
+ size_t max_frame_size;
+ size_t parsed_frame_size;
+} tsi_fake_zero_copy_grpc_protector;
+
/* --- Utils. ---*/
static const char *tsi_fake_handshake_message_strings[] = {
@@ -113,6 +122,28 @@ static void store32_little_endian(uint32_t value, unsigned char *buf) {
buf[0] = (unsigned char)((value)&0xFF);
}
+static uint32_t read_frame_size(const grpc_slice_buffer *sb) {
+ GPR_ASSERT(sb != NULL && sb->length >= TSI_FAKE_FRAME_HEADER_SIZE);
+ uint8_t frame_size_buffer[TSI_FAKE_FRAME_HEADER_SIZE];
+ uint8_t *buf = frame_size_buffer;
+ /* Copies the first 4 bytes to a temporary buffer. */
+ size_t remaining = TSI_FAKE_FRAME_HEADER_SIZE;
+ for (size_t i = 0; i < sb->count; i++) {
+ size_t slice_length = GRPC_SLICE_LENGTH(sb->slices[i]);
+ if (remaining <= slice_length) {
+ memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), remaining);
+ remaining = 0;
+ break;
+ } else {
+ memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), slice_length);
+ buf += slice_length;
+ remaining -= slice_length;
+ }
+ }
+ GPR_ASSERT(remaining == 0);
+ return load32_little_endian(frame_size_buffer);
+}
+
static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) {
frame->offset = 0;
frame->needs_draining = needs_draining;
@@ -363,6 +394,84 @@ static const tsi_frame_protector_vtable frame_protector_vtable = {
fake_protector_unprotect, fake_protector_destroy,
};
+/* --- tsi_zero_copy_grpc_protector methods implementation. ---*/
+
+static tsi_result fake_zero_copy_grpc_protector_protect(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices,
+ grpc_slice_buffer *protected_slices) {
+ if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ /* Protects each frame. */
+ while (unprotected_slices->length > 0) {
+ size_t frame_length =
+ GPR_MIN(impl->max_frame_size,
+ unprotected_slices->length + TSI_FAKE_FRAME_HEADER_SIZE);
+ grpc_slice slice = GRPC_SLICE_MALLOC(TSI_FAKE_FRAME_HEADER_SIZE);
+ store32_little_endian((uint32_t)frame_length, GRPC_SLICE_START_PTR(slice));
+ grpc_slice_buffer_add(protected_slices, slice);
+ size_t data_length = frame_length - TSI_FAKE_FRAME_HEADER_SIZE;
+ grpc_slice_buffer_move_first(unprotected_slices, data_length,
+ protected_slices);
+ }
+ return TSI_OK;
+}
+
+static tsi_result fake_zero_copy_grpc_protector_unprotect(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices,
+ grpc_slice_buffer *unprotected_slices) {
+ if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ grpc_slice_buffer_move_into(protected_slices, &impl->protected_sb);
+ /* Unprotect each frame, if we get a full frame. */
+ while (impl->protected_sb.length >= TSI_FAKE_FRAME_HEADER_SIZE) {
+ if (impl->parsed_frame_size == 0) {
+ impl->parsed_frame_size = read_frame_size(&impl->protected_sb);
+ if (impl->parsed_frame_size <= 4) {
+ gpr_log(GPR_ERROR, "Invalid frame size.");
+ return TSI_DATA_CORRUPTED;
+ }
+ }
+ /* If we do not have a full frame, return with OK status. */
+ if (impl->protected_sb.length < impl->parsed_frame_size) break;
+ /* Strips header bytes. */
+ grpc_slice_buffer_move_first(&impl->protected_sb,
+ TSI_FAKE_FRAME_HEADER_SIZE, &impl->header_sb);
+ /* Moves data to unprotected slices. */
+ grpc_slice_buffer_move_first(
+ &impl->protected_sb,
+ impl->parsed_frame_size - TSI_FAKE_FRAME_HEADER_SIZE,
+ unprotected_slices);
+ impl->parsed_frame_size = 0;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &impl->header_sb);
+ }
+ return TSI_OK;
+}
+
+static void fake_zero_copy_grpc_protector_destroy(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self) {
+ if (self == NULL) return;
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ grpc_slice_buffer_destroy_internal(exec_ctx, &impl->header_sb);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &impl->protected_sb);
+ gpr_free(impl);
+}
+
+static const tsi_zero_copy_grpc_protector_vtable
+ zero_copy_grpc_protector_vtable = {
+ fake_zero_copy_grpc_protector_protect,
+ fake_zero_copy_grpc_protector_unprotect,
+ fake_zero_copy_grpc_protector_destroy,
+};
+
/* --- tsi_handshaker_result methods implementation. ---*/
typedef struct {
@@ -383,6 +492,14 @@ static tsi_result fake_handshaker_result_extract_peer(
return result;
}
+static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_zero_copy_grpc_protector **protector) {
+ *protector =
+ tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size);
+ return TSI_OK;
+}
+
static tsi_result fake_handshaker_result_create_frame_protector(
const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
tsi_frame_protector **protector) {
@@ -407,7 +524,7 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) {
static const tsi_handshaker_result_vtable handshaker_result_vtable = {
fake_handshaker_result_extract_peer,
- NULL, /* create_zero_copy_grpc_protector */
+ fake_handshaker_result_create_zero_copy_grpc_protector,
fake_handshaker_result_create_frame_protector,
fake_handshaker_result_get_unused_bytes,
fake_handshaker_result_destroy,
@@ -631,3 +748,16 @@ tsi_frame_protector *tsi_create_fake_frame_protector(
impl->base.vtable = &frame_protector_vtable;
return &impl->base;
}
+
+tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector(
+ size_t *max_protected_frame_size) {
+ tsi_fake_zero_copy_grpc_protector *impl = gpr_zalloc(sizeof(*impl));
+ grpc_slice_buffer_init(&impl->header_sb);
+ grpc_slice_buffer_init(&impl->protected_sb);
+ impl->max_frame_size = (max_protected_frame_size == NULL)
+ ? TSI_FAKE_DEFAULT_FRAME_SIZE
+ : *max_protected_frame_size;
+ impl->parsed_frame_size = 0;
+ impl->base.vtable = &zero_copy_grpc_protector_vtable;
+ return &impl->base;
+}
diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h
index 934b3cbeb2..6159708a84 100644
--- a/src/core/tsi/fake_transport_security.h
+++ b/src/core/tsi/fake_transport_security.h
@@ -39,6 +39,11 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client);
tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size);
+/* Creates a zero-copy protector directly without going through the handshake
+ * phase. */
+tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector(
+ size_t *max_protected_frame_size);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c
index 5bcfdfa61f..773b35e717 100644
--- a/src/core/tsi/transport_security_grpc.c
+++ b/src/core/tsi/transport_security_grpc.c
@@ -37,28 +37,33 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
Calls specific implementation after state/input validation. */
tsi_result tsi_zero_copy_grpc_protector_protect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices,
grpc_slice_buffer *protected_slices) {
- if (self == NULL || self->vtable == NULL || unprotected_slices == NULL ||
- protected_slices == NULL) {
+ if (exec_ctx == NULL || self == NULL || self->vtable == NULL ||
+ unprotected_slices == NULL || protected_slices == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED;
- return self->vtable->protect(self, unprotected_slices, protected_slices);
+ return self->vtable->protect(exec_ctx, self, unprotected_slices,
+ protected_slices);
}
tsi_result tsi_zero_copy_grpc_protector_unprotect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices,
grpc_slice_buffer *unprotected_slices) {
- if (self == NULL || self->vtable == NULL || protected_slices == NULL ||
- unprotected_slices == NULL) {
+ if (exec_ctx == NULL || self == NULL || self->vtable == NULL ||
+ protected_slices == NULL || unprotected_slices == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED;
- return self->vtable->unprotect(self, protected_slices, unprotected_slices);
+ return self->vtable->unprotect(exec_ctx, self, protected_slices,
+ unprotected_slices);
}
-void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) {
+void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self) {
if (self == NULL) return;
- self->vtable->destroy(self);
+ self->vtable->destroy(exec_ctx, self);
}
diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h
index 5ab5297cc4..375a758888 100644
--- a/src/core/tsi/transport_security_grpc.h
+++ b/src/core/tsi/transport_security_grpc.h
@@ -42,8 +42,8 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
- This method returns TSI_OK in case of success or a specific error code in
case of failure. */
tsi_result tsi_zero_copy_grpc_protector_protect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
- grpc_slice_buffer *protected_slices);
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices);
/* Outputs unprotected bytes.
- protected_slices is the bytes of protected frames.
@@ -52,21 +52,24 @@ tsi_result tsi_zero_copy_grpc_protector_protect(
there is not enough data to output in which case unprotected_slices has 0
bytes. */
tsi_result tsi_zero_copy_grpc_protector_unprotect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
- grpc_slice_buffer *unprotected_slices);
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices);
/* Destroys the tsi_zero_copy_grpc_protector object. */
-void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self);
+void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self);
/* Base for tsi_zero_copy_grpc_protector implementations. */
typedef struct {
- tsi_result (*protect)(tsi_zero_copy_grpc_protector *self,
+ tsi_result (*protect)(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self,
grpc_slice_buffer *unprotected_slices,
grpc_slice_buffer *protected_slices);
- tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self,
+ tsi_result (*unprotect)(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self,
grpc_slice_buffer *protected_slices,
grpc_slice_buffer *unprotected_slices);
- void (*destroy)(tsi_zero_copy_grpc_protector *self);
+ void (*destroy)(grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self);
} tsi_zero_copy_grpc_protector_vtable;
struct tsi_zero_copy_grpc_protector {
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 2071827b64..ec642b0520 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -298,8 +298,8 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 0402ce34fb..57b543967e 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -88,6 +88,7 @@ grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import;
grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import;
grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
grpc_alarm_create_type grpc_alarm_create_import;
+grpc_alarm_set_type grpc_alarm_set_import;
grpc_alarm_cancel_type grpc_alarm_cancel_import;
grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
@@ -395,6 +396,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown");
grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy");
grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create");
+ grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set");
grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");
grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy");
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index e3704e592b..c5c848ae44 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -242,13 +242,16 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import
typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq);
extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
#define grpc_completion_queue_destroy grpc_completion_queue_destroy_import
-typedef grpc_alarm *(*grpc_alarm_create_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *tag);
+typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved);
extern grpc_alarm_create_type grpc_alarm_create_import;
#define grpc_alarm_create grpc_alarm_create_import
-typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm);
+typedef void(*grpc_alarm_set_type)(grpc_alarm *alarm, grpc_completion_queue *cq, gpr_timespec deadline, void *tag, void *reserved);
+extern grpc_alarm_set_type grpc_alarm_set_import;
+#define grpc_alarm_set grpc_alarm_set_import
+typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm, void *reserved);
extern grpc_alarm_cancel_type grpc_alarm_cancel_import;
#define grpc_alarm_cancel grpc_alarm_cancel_import
-typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm);
+typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm, void *reserved);
extern grpc_alarm_destroy_type grpc_alarm_destroy_import;
#define grpc_alarm_destroy grpc_alarm_destroy_import
typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect);
diff --git a/test/core/end2end/fixtures/h2_load_reporting.c b/test/core/end2end/fixtures/h2_load_reporting.c
index 385e2cbf70..8a05bb722a 100644
--- a/test/core/end2end/fixtures/h2_load_reporting.c
+++ b/test/core/end2end/fixtures/h2_load_reporting.c
@@ -28,7 +28,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c
index 3584d47887..7b503790db 100644
--- a/test/core/end2end/tests/load_reporting_hook.c
+++ b/test/core/end2end/tests/load_reporting_hook.c
@@ -26,8 +26,8 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/static_metadata.h"
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index 09846ba10e..839a05fa9b 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -36,12 +36,19 @@ static gpr_mu *g_mu;
static grpc_pollset *g_pollset;
static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
- size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices) {
+ size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices,
+ bool use_zero_copy_protector) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
tsi_frame_protector *fake_read_protector =
tsi_create_fake_frame_protector(NULL);
tsi_frame_protector *fake_write_protector =
tsi_create_fake_frame_protector(NULL);
+ tsi_zero_copy_grpc_protector *fake_read_zero_copy_protector =
+ use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL)
+ : NULL;
+ tsi_zero_copy_grpc_protector *fake_write_zero_copy_protector =
+ use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL)
+ : NULL;
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
@@ -54,8 +61,9 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset);
if (leftover_nslices == 0) {
- f.client_ep =
- grpc_secure_endpoint_create(fake_read_protector, tcp.client, NULL, 0);
+ f.client_ep = grpc_secure_endpoint_create(fake_read_protector,
+ fake_read_zero_copy_protector,
+ tcp.client, NULL, 0);
} else {
unsigned i;
tsi_result result;
@@ -96,31 +104,47 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
} while (still_pending_size > 0);
encrypted_leftover = grpc_slice_from_copied_buffer(
(const char *)encrypted_buffer, total_buffer_size - buffer_size);
- f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp.client,
- &encrypted_leftover, 1);
+ f.client_ep = grpc_secure_endpoint_create(
+ fake_read_protector, fake_read_zero_copy_protector, tcp.client,
+ &encrypted_leftover, 1);
grpc_slice_unref(encrypted_leftover);
gpr_free(encrypted_buffer);
}
- f.server_ep =
- grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0);
+ f.server_ep = grpc_secure_endpoint_create(fake_write_protector,
+ fake_write_zero_copy_protector,
+ tcp.server, NULL, 0);
grpc_exec_ctx_finish(&exec_ctx);
return f;
}
static grpc_endpoint_test_fixture
secure_endpoint_create_fixture_tcp_socketpair_noleftover(size_t slice_size) {
- return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0);
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0,
+ false);
+}
+
+static grpc_endpoint_test_fixture
+secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy(
+ size_t slice_size) {
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0,
+ true);
}
static grpc_endpoint_test_fixture
secure_endpoint_create_fixture_tcp_socketpair_leftover(size_t slice_size) {
grpc_slice s =
grpc_slice_from_copied_string("hello world 12345678900987654321");
- grpc_endpoint_test_fixture f;
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1,
+ false);
+}
- f = secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1);
- return f;
+static grpc_endpoint_test_fixture
+secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy(
+ size_t slice_size) {
+ grpc_slice s =
+ grpc_slice_from_copied_string("hello world 12345678900987654321");
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1, true);
}
static void clean_up(void) {}
@@ -128,8 +152,14 @@ static void clean_up(void) {}
static grpc_endpoint_test_config configs[] = {
{"secure_ep/tcp_socketpair",
secure_endpoint_create_fixture_tcp_socketpair_noleftover, clean_up},
+ {"secure_ep/tcp_socketpair_zero_copy",
+ secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy,
+ clean_up},
{"secure_ep/tcp_socketpair_leftover",
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
+ {"secure_ep/tcp_socketpair_leftover_zero_copy",
+ secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy,
+ clean_up},
};
static void inc_call_ctr(grpc_exec_ctx *exec_ctx, void *arg,
@@ -184,7 +214,9 @@ int main(int argc, char **argv) {
g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
- test_leftover(configs[1], 1);
+ grpc_endpoint_tests(configs[1], g_pollset, g_mu);
+ test_leftover(configs[2], 1);
+ test_leftover(configs[3], 1);
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
diff --git a/test/core/surface/alarm_test.c b/test/core/surface/alarm_test.c
index 6971d92074..4fd7cb93c6 100644
--- a/test/core/surface/alarm_test.c
+++ b/test/core/surface/alarm_test.c
@@ -48,45 +48,50 @@ static void test_alarm(void) {
/* regular expiry */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(1), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(1), tag, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(2),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
}
{
/* cancellation */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL);
- grpc_alarm_cancel(alarm);
+ grpc_alarm_cancel(alarm, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success == 0);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
}
{
/* alarm_destroy before cq_next */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success == 0);
}
+ {
+ /* alarm_destroy before set */
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_destroy(alarm, NULL);
+ }
shutdown_and_destroy(cc);
}
diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc
index ce4168843c..212972d25d 100644
--- a/test/cpp/common/alarm_cpp_test.cc
+++ b/test/cpp/common/alarm_cpp_test.cc
@@ -18,6 +18,8 @@
#include <grpc++/alarm.h>
#include <grpc++/completion_queue.h>
+#include <thread>
+
#include <gtest/gtest.h>
#include "test/core/util/test_config.h"
@@ -28,6 +30,46 @@ namespace {
TEST(AlarmTest, RegularExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+
+ void* output_tag;
+ bool ok;
+ const CompletionQueue::NextStatus status = cq.AsyncNext(
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2));
+
+ EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(junk, output_tag);
+}
+
+TEST(AlarmTest, MultithreadedRegularExpiry) {
+ CompletionQueue cq;
+ void* junk = reinterpret_cast<void*>(1618033);
+ void* output_tag;
+ bool ok;
+ CompletionQueue::NextStatus status;
+ Alarm alarm;
+
+ std::thread t1([&alarm, &cq, &junk] {
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ });
+
+ std::thread t2([&cq, &ok, &output_tag, &status] {
+ status = cq.AsyncNext((void**)&output_tag, &ok,
+ grpc_timeout_seconds_to_deadline(2));
+ });
+
+ t1.join();
+ t2.join();
+ EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(junk, output_tag);
+}
+
+TEST(AlarmTest, DeprecatedRegularExpiry) {
+ CompletionQueue cq;
+ void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk);
void* output_tag;
@@ -43,7 +85,8 @@ TEST(AlarmTest, RegularExpiry) {
TEST(AlarmTest, MoveConstructor) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ Alarm first;
+ first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
Alarm second(std::move(first));
void* output_tag;
bool ok;
@@ -57,7 +100,8 @@ TEST(AlarmTest, MoveConstructor) {
TEST(AlarmTest, MoveAssignment) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ Alarm first;
+ first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
Alarm second(std::move(first));
first = std::move(second);
@@ -76,7 +120,8 @@ TEST(AlarmTest, RegularExpiryChrono) {
void* junk = reinterpret_cast<void*>(1618033);
std::chrono::system_clock::time_point one_sec_deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
- Alarm alarm(&cq, one_sec_deadline, junk);
+ Alarm alarm;
+ alarm.Set(&cq, one_sec_deadline, junk);
void* output_tag;
bool ok;
@@ -91,7 +136,8 @@ TEST(AlarmTest, RegularExpiryChrono) {
TEST(AlarmTest, ZeroExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(0), junk);
void* output_tag;
bool ok;
@@ -106,7 +152,8 @@ TEST(AlarmTest, ZeroExpiry) {
TEST(AlarmTest, NegativeExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(-1), junk);
void* output_tag;
bool ok;
@@ -121,7 +168,8 @@ TEST(AlarmTest, NegativeExpiry) {
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(2), junk);
alarm.Cancel();
void* output_tag;
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index 54408db600..c236f76e89 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -226,6 +226,31 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters();
}
+ bool SeenAllServers() {
+ for (const auto& server : servers_) {
+ if (server->service_.request_count() == 0) return false;
+ }
+ return true;
+ }
+
+ // Updates \a connection_order by appending to it the index of the newly
+ // connected server. Must be called after every single RPC.
+ void UpdateConnectionOrder(
+ const std::vector<std::unique_ptr<ServerData>>& servers,
+ std::vector<int>* connection_order) {
+ for (size_t i = 0; i < servers.size(); ++i) {
+ if (servers[i]->service_.request_count() == 1) {
+ // Was the server index known? If not, update connection_order.
+ const auto it =
+ std::find(connection_order->begin(), connection_order->end(), i);
+ if (it == connection_order->end()) {
+ connection_order->push_back(i);
+ return;
+ }
+ }
+ }
+ }
+
const grpc::string server_host_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -370,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
ports.emplace_back(server->port_);
}
SetNextResolution(ports);
- for (size_t i = 0; i < servers_.size(); ++i) {
+ // Wait until all backends are ready.
+ do {
CheckRpcSendOk();
- }
- // One request should have gone to each server.
+ } while (!SeenAllServers());
+ ResetCounters();
+ // "Sync" to the end of the list. Next sequence of picks will start at the
+ // first server (index 0).
+ WaitForServer(servers_.size() - 1);
+ std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) {
- EXPECT_EQ(1, servers_[i]->service_.request_count());
+ CheckRpcSendOk();
+ UpdateConnectionOrder(servers_, &connection_order);
}
+ // Backends should be iterated over in the order in which the addresses were
+ // given.
+ const auto expected = std::vector<int>{0, 1, 2};
+ EXPECT_EQ(expected, connection_order);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
@@ -529,13 +564,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
- // Send one RPC per backend and make sure they are used in order.
- // Note: This relies on the fact that the subchannels are reported in
- // state READY in the order in which the addresses are specified,
- // which is only true because the backends are all local.
- for (size_t i = 0; i < servers_.size(); ++i) {
+ // Send a number of RPCs, which succeed.
+ for (size_t i = 0; i < 100; ++i) {
CheckRpcSendOk();
- EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index b5cff664f6..570a3d1067 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds) {}
+ client_load_reporting_interval_seconds),
+ kRequestMessage_("Live long and prosper.") {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
+ void ResetBackendCounters() {
+ for (const auto& backend : backends_) backend->ResetCounters();
+ }
+
ClientStats WaitForLoadReports() {
ClientStats client_stats;
for (const auto& balancer : balancers_) {
@@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
return client_stats;
}
+ bool SeenAllBackends() {
+ for (const auto& backend : backends_) {
+ if (backend->request_count() == 0) return false;
+ }
+ return true;
+ }
+
+ void WaitForAllBackends() {
+ while (!SeenAllBackends()) {
+ CheckRpcSendOk();
+ }
+ ResetBackendCounters();
+ }
+
+ void WaitForBackend(size_t backend_idx) {
+ do {
+ CheckRpcSendOk();
+ } while (backends_[backend_idx]->request_count() == 0);
+ ResetBackendCounters();
+ }
+
struct AddressData {
int port;
bool is_balancer;
@@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test {
balancers_.at(i)->add_response(response, delay_ms);
}
- std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message,
- int num_rpcs,
- int timeout_ms = 1000) {
- std::vector<std::pair<Status, EchoResponse>> results;
+ Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
+ const bool local_response = (response == nullptr);
+ if (local_response) response = new EchoResponse;
EchoRequest request;
- EchoResponse response;
- request.set_message(message);
- for (int i = 0; i < num_rpcs; i++) {
- ClientContext context;
- context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
- Status status = stub_->Echo(&context, request, &response);
- results.push_back(std::make_pair(status, response));
+ request.set_message(kRequestMessage_);
+ ClientContext context;
+ context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ Status status = stub_->Echo(&context, request, response);
+ if (local_response) delete response;
+ return status;
+ }
+
+ void CheckRpcSendOk(const size_t times = 1) {
+ for (size_t i = 0; i < times; ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
- return results;
+ }
+
+ void CheckRpcSendFailure() {
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
}
template <typename T>
@@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test {
const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
-
std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
-
std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_;
-
grpc_fake_resolver_response_generator* response_generator_;
+ const grpc::string kRequestMessage_;
};
class SingleBalancerTest : public GrpclbEnd2endTest {
@@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+
+ // We need to wait for all backends to come online.
+ WaitForAllBackends();
+
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
@@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, num_backends_, kCallDeadlineMs);
+ CheckRpcSendOk(num_backends_);
const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0);
@@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, RepeatedServerlist) {
- constexpr int kServerlistDelayMs = 100;
-
- // Send a serverlist right away.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- 0);
- // ... and the same one a bit later.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- kServerlistDelayMs);
-
- // Send num_backends/2 requests.
- auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2);
- // only the first half of the backends will receive them.
- for (size_t i = 0; i < backends_.size(); ++i) {
- if (i < backends_.size() / 2)
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- else
- EXPECT_EQ(0U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
- // Wait for the (duplicated) serverlist update.
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
-
- // Verify the LB has sent two responses.
- EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
-
- // Some more calls to complete the total number of backends.
- statuses_and_responses = SendRpc(
- kMessage_,
- num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */);
- // Because a duplicated serverlist should have no effect, all backends must
- // have been hit once now.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- balancers_[0]->NotifyDoneWithServerlists();
- // The balancer got a single request.
- EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
- // Check LB policy name for the channel.
- EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
-}
-
TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
@@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- auto statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- // Each backend should have gotten 100 requests.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(kNumRpcsPerAddress,
- backend_servers_[i].service_->request_count());
- }
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServiceImpl());
backend_servers_.emplace_back(ServerThread<BackendService>(
@@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// TODO(dgq): implement the "backend restart" component as well. We need extra
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
@@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// This is serviced by the existing RR policy
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should again have gone to the first backend.
EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
@@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// receiving a request. In the meantime, the client continues to be serviced
// (by the first backend) without interruption.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
// This is serviced by the existing RR policy
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
-
+ // Send kNumRpcsPerAddress RPCs for each server and drop address.
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
0);
- const auto& statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
- }
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
TEST_F(SingleBalancerTest, DropAll) {
@@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) {
1000);
// First call succeeds.
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+ CheckRpcSendOk();
// But eventually, the update with only dropped servers is processed and calls
// fail.
+ Status status;
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- ASSERT_EQ(statuses_and_responses.size(), 1UL);
- } while (statuses_and_responses[0].first.ok());
- const Status& status = statuses_and_responses[0].first;
+ status = SendRpc();
+ } while (status.ok());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
@@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
@@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index 83b91e2ce9..985a335f1b 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -35,14 +35,14 @@ grpc_cc_library(
"fullstack_fixtures.h",
"helpers.h",
],
+ external_deps = [
+ "benchmark",
+ ],
deps = [
"//:grpc++_unsecure",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util_unsecure",
],
- external_deps = [
- "benchmark",
- ],
)
grpc_cc_binary(
@@ -76,14 +76,20 @@ grpc_cc_binary(
grpc_cc_binary(
name = "bm_fullstack_streaming_ping_pong",
testonly = 1,
- srcs = ["bm_fullstack_streaming_ping_pong.cc"],
+ srcs = [
+ "bm_fullstack_streaming_ping_pong.cc",
+ "fullstack_streaming_ping_pong.h",
+ ],
deps = [":helpers"],
)
grpc_cc_binary(
name = "bm_fullstack_streaming_pump",
testonly = 1,
- srcs = ["bm_fullstack_streaming_pump.cc"],
+ srcs = [
+ "bm_fullstack_streaming_pump.cc",
+ "fullstack_streaming_pump.h",
+ ],
deps = [":helpers"],
)
@@ -92,15 +98,18 @@ grpc_cc_binary(
testonly = 1,
srcs = ["bm_fullstack_trickle.cc"],
deps = [
- ":helpers",
- "//test/cpp/util:test_config",
+ ":helpers",
+ "//test/cpp/util:test_config",
],
)
grpc_cc_binary(
name = "bm_fullstack_unary_ping_pong",
testonly = 1,
- srcs = ["bm_fullstack_unary_ping_pong.cc"],
+ srcs = [
+ "bm_fullstack_unary_ping_pong.cc",
+ "fullstack_unary_ping_pong.h",
+ ],
deps = [":helpers"],
)
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 518c65ac8d..cadc9b2a11 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -35,7 +35,7 @@ extern "C" {
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -620,7 +620,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata);
typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata);
-typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST>
+typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST>
LoadReportingFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata);
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index 0712a40018..655e032faf 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -18,13 +18,7 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
namespace grpc {
namespace testing {
@@ -33,365 +27,6 @@ namespace testing {
auto& force_library_initialization = Library::get();
/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
-// messages in each call) in a loop on a single channel
-//
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-// Second parameter (i.e state.range(1)): Number of ping pong messages.
-// Note: One ping-pong means two messages (one from client to server and
-// the other from server to client):
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPong(benchmark::State& state) {
- const int msg_size = state.range(0);
- const int max_ping_pongs = state.range(1);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- while (state.KeepRunning()) {
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- // Establish async stream between client side and server side
- void* t;
- bool ok;
- int need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- // Send 'max_ping_pongs' number of ping pong messages
- int ping_pong_cnt = 0;
- while (ping_pong_cnt < max_ping_pongs) {
- request_rw->Write(send_request, tag(0)); // Start client send
- response_rw.Read(&recv_request, tag(1)); // Start server recv
- request_rw->Read(&recv_response, tag(2)); // Start client recv
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 1) {
- response_rw.Write(send_response, tag(3));
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- ping_pong_cnt++;
- }
-
- request_rw->WritesDone(tag(0));
- response_rw.Finish(Status::OK, tag(1));
-
- Status recv_status;
- request_rw->Finish(&recv_status, tag(2));
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
-}
-
-// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPongMsgs(benchmark::State& state) {
- const int msg_size = state.range(0);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- // Establish async stream between client side and server side
- void* t;
- bool ok;
- int need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- request_rw->Write(send_request, tag(0)); // Start client send
- response_rw.Read(&recv_request, tag(1)); // Start server recv
- request_rw->Read(&recv_response, tag(2)); // Start client recv
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 1) {
- response_rw.Write(send_response, tag(3));
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- }
-
- request_rw->WritesDone(tag(0));
- response_rw.Finish(Status::OK, tag(1));
- Status recv_status;
- request_rw->Finish(&recv_status, tag(2));
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * 2);
-}
-
-// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
-// messages in each call) in a loop on a single channel. Different from
-// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
-// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
-// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
-// message; 2. final streaming message with trailing metadata.
-//
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-// Second parameter (i.e state.range(1)): Number of ping pong messages.
-// Note: One ping-pong means two messages (one from client to server and
-// the other from server to client):
-// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
-// API and WriteLast API for server.
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
- const int msg_size = state.range(0);
- const int max_ping_pongs = state.range(1);
- // This options is used to test out server API: WriteLast and WriteAndFinish
- // respectively, since we can not use both of them on server side at the same
- // time. Value 1 means we are testing out the WriteAndFinish API, and
- // otherwise we are testing out the WriteLast API.
- const int write_and_finish = state.range(2);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- while (state.KeepRunning()) {
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- cli_ctx.set_initial_metadata_corked(true);
- // tag:1 here will never comes up, since we are not performing any op due
- // to initial metadata coalescing.
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- void* t;
- bool ok;
- int need_tags;
-
- // Send 'max_ping_pongs' number of ping pong messages
- int ping_pong_cnt = 0;
- while (ping_pong_cnt < max_ping_pongs) {
- if (ping_pong_cnt == max_ping_pongs - 1) {
- request_rw->WriteLast(send_request, WriteOptions(), tag(2));
- } else {
- request_rw->Write(send_request, tag(2)); // Start client send
- }
-
- need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
-
- if (ping_pong_cnt == 0) {
- // wait for the server call structure (call_hook, etc.) to be
- // initialized (async stream between client side and server side
- // established). It is necessary when client init metadata is
- // coalesced
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- while ((int)(intptr_t)t != 0) {
- // In some cases tag:2 comes before tag:0 (write tag comes out
- // first), this while loop is to make sure get tag:0.
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- }
- }
-
- response_rw.Read(&recv_request, tag(3)); // Start server recv
- request_rw->Read(&recv_response, tag(4)); // Start client recv
-
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 3) {
- if (ping_pong_cnt == max_ping_pongs - 1) {
- if (write_and_finish == 1) {
- response_rw.WriteAndFinish(send_response, WriteOptions(),
- Status::OK, tag(5));
- } else {
- response_rw.WriteLast(send_response, WriteOptions(), tag(5));
- // WriteLast buffers the write, so neither server write op nor
- // client read op will finish inside the while loop.
- need_tags &= ~(1 << 4);
- need_tags &= ~(1 << 5);
- }
- } else {
- response_rw.Write(send_response, tag(5));
- }
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- ping_pong_cnt++;
- }
-
- if (max_ping_pongs == 0) {
- need_tags = (1 << 6) | (1 << 7) | (1 << 8);
- } else {
- if (write_and_finish == 1) {
- need_tags = (1 << 8);
- } else {
- // server's buffered write and the client's read of the buffered write
- // tags should come up.
- need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
- }
- }
-
- // No message write or initial metadata write happened yet.
- if (max_ping_pongs == 0) {
- request_rw->WritesDone(tag(6));
- // wait for server call data structure(call_hook, etc.) to be
- // initialized, since initial metadata is corked.
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- while ((int)(intptr_t)t != 0) {
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- }
- response_rw.Finish(Status::OK, tag(7));
- } else {
- if (write_and_finish != 1) {
- response_rw.Finish(Status::OK, tag(7));
- }
- }
-
- Status recv_status;
- request_rw->Finish(&recv_status, tag(8));
-
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
-}
-
-/*******************************************************************************
* CONFIGURATIONS
*/
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
index 6fbf9da0ad..c7ceacd320 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
@@ -18,157 +18,18 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
namespace grpc {
namespace testing {
-// force library initialization
-auto& force_library_initialization = Library::get();
-
-/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-template <class Fixture>
-static void BM_PumpStreamClientToServer(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoRequest send_request;
- EchoRequest recv_request;
- if (state.range(0) > 0) {
- send_request.set_message(std::string(state.range(0), 'a'));
- }
- Status recv_status;
- ServerContext svr_ctx;
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- ClientContext cli_ctx;
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
- void* t;
- bool ok;
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- response_rw.Read(&recv_request, tag(0));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- request_rw->Write(send_request, tag(1));
- while (true) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- response_rw.Read(&recv_request, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
- }
- }
- }
- request_rw->WritesDone(tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- response_rw.Finish(Status::OK, tag(0));
- Status final_status;
- request_rw->Finish(&final_status, tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- GPR_ASSERT(final_status.ok());
- }
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
-}
-
-template <class Fixture>
-static void BM_PumpStreamServerToClient(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- if (state.range(0) > 0) {
- send_response.set_message(std::string(state.range(0), 'a'));
- }
- Status recv_status;
- ServerContext svr_ctx;
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- ClientContext cli_ctx;
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
- void* t;
- bool ok;
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- request_rw->Read(&recv_response, tag(0));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- response_rw.Write(send_response, tag(1));
- while (true) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- request_rw->Read(&recv_response, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
- }
- }
- }
- response_rw.Finish(Status::OK, tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- }
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
-}
-
/*******************************************************************************
* CONFIGURATIONS
*/
+// force library initialization
+auto& force_library_initialization = Library::get();
+
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
index 9af751245f..fa41d114c0 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
@@ -18,13 +18,7 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
namespace grpc {
namespace testing {
@@ -33,85 +27,6 @@ namespace testing {
auto& force_library_initialization = Library::get();
/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_UnaryPingPong(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- EchoRequest send_request;
- EchoResponse send_response;
- EchoResponse recv_response;
- if (state.range(0) > 0) {
- send_request.set_message(std::string(state.range(0), 'a'));
- }
- if (state.range(1) > 0) {
- send_response.set_message(std::string(state.range(1), 'a'));
- }
- Status recv_status;
- struct ServerEnv {
- ServerContext ctx;
- EchoRequest recv_request;
- grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
- ServerEnv() : response_writer(&ctx) {}
- };
- uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
- ServerEnv* server_env[2] = {
- reinterpret_cast<ServerEnv*>(server_env_buffer),
- reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
- new (server_env[0]) ServerEnv;
- new (server_env[1]) ServerEnv;
- service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
- &server_env[0]->response_writer, fixture->cq(),
- fixture->cq(), tag(0));
- service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
- &server_env[1]->response_writer, fixture->cq(),
- fixture->cq(), tag(1));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- recv_response.Clear();
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
- stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
- void* t;
- bool ok;
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- GPR_ASSERT(t == tag(0) || t == tag(1));
- intptr_t slot = reinterpret_cast<intptr_t>(t);
- ServerEnv* senv = server_env[slot];
- ServerContextMutator svr_ctx_mut(&senv->ctx);
- senv->response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
- for (int i = (1 << 3) | (1 << 4); i != 0;) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int tagnum = (int)reinterpret_cast<intptr_t>(t);
- GPR_ASSERT(i & (1 << tagnum));
- i -= 1 << tagnum;
- }
- GPR_ASSERT(recv_status.ok());
-
- senv->~ServerEnv();
- senv = new (senv) ServerEnv();
- service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
- fixture->cq(), fixture->cq(), tag(slot));
- }
- fixture->Finish(state);
- fixture.reset();
- server_env[0]->~ServerEnv();
- server_env[1]->~ServerEnv();
- state.SetBytesProcessed(state.range(0) * state.iterations() +
- state.range(1) * state.iterations());
-}
-
-/*******************************************************************************
* CONFIGURATIONS
*/
diff --git a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
new file mode 100644
index 0000000000..ff1f966753
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
@@ -0,0 +1,396 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPong(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ // Establish async stream between client side and server side
+ void* t;
+ bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ request_rw->WritesDone(tag(0));
+ response_rw.Finish(Status::OK, tag(1));
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+
+// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongMsgs(benchmark::State& state) {
+ const int msg_size = state.range(0);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ // Establish async stream between client side and server side
+ void* t;
+ bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+
+ request_rw->WritesDone(tag(0));
+ response_rw.Finish(Status::OK, tag(1));
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * 2);
+}
+
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel. Different from
+// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
+// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
+// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
+// message; 2. final streaming message with trailing metadata.
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
+// API and WriteLast API for server.
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+ // This options is used to test out server API: WriteLast and WriteAndFinish
+ // respectively, since we can not use both of them on server side at the same
+ // time. Value 1 means we are testing out the WriteAndFinish API, and
+ // otherwise we are testing out the WriteLast API.
+ const int write_and_finish = state.range(2);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 here will never comes up, since we are not performing any op due
+ // to initial metadata coalescing.
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ void* t;
+ bool ok;
+ int need_tags;
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ request_rw->WriteLast(send_request, WriteOptions(), tag(2));
+ } else {
+ request_rw->Write(send_request, tag(2)); // Start client send
+ }
+
+ need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+ if (ping_pong_cnt == 0) {
+ // wait for the server call structure (call_hook, etc.) to be
+ // initialized (async stream between client side and server side
+ // established). It is necessary when client init metadata is
+ // coalesced
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ // In some cases tag:2 comes before tag:0 (write tag comes out
+ // first), this while loop is to make sure get tag:0.
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ }
+
+ response_rw.Read(&recv_request, tag(3)); // Start server recv
+ request_rw->Read(&recv_response, tag(4)); // Start client recv
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 3) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ if (write_and_finish == 1) {
+ response_rw.WriteAndFinish(send_response, WriteOptions(),
+ Status::OK, tag(5));
+ } else {
+ response_rw.WriteLast(send_response, WriteOptions(), tag(5));
+ // WriteLast buffers the write, so neither server write op nor
+ // client read op will finish inside the while loop.
+ need_tags &= ~(1 << 4);
+ need_tags &= ~(1 << 5);
+ }
+ } else {
+ response_rw.Write(send_response, tag(5));
+ }
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ if (max_ping_pongs == 0) {
+ need_tags = (1 << 6) | (1 << 7) | (1 << 8);
+ } else {
+ if (write_and_finish == 1) {
+ need_tags = (1 << 8);
+ } else {
+ // server's buffered write and the client's read of the buffered write
+ // tags should come up.
+ need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
+ }
+ }
+
+ // No message write or initial metadata write happened yet.
+ if (max_ping_pongs == 0) {
+ request_rw->WritesDone(tag(6));
+ // wait for server call data structure(call_hook, etc.) to be
+ // initialized, since initial metadata is corked.
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ response_rw.Finish(Status::OK, tag(7));
+ } else {
+ if (write_and_finish != 1) {
+ response_rw.Finish(Status::OK, tag(7));
+ }
+ }
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(8));
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
diff --git a/test/cpp/microbenchmarks/fullstack_streaming_pump.h b/test/cpp/microbenchmarks/fullstack_streaming_pump.h
new file mode 100644
index 0000000000..f9db212b02
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_streaming_pump.h
@@ -0,0 +1,170 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+template <class Fixture>
+static void BM_PumpStreamClientToServer(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ response_rw.Read(&recv_request, tag(0));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ request_rw->Write(send_request, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ response_rw.Read(&recv_request, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ request_rw->WritesDone(tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ response_rw.Finish(Status::OK, tag(0));
+ Status final_status;
+ request_rw->Finish(&final_status, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ GPR_ASSERT(final_status.ok());
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+
+template <class Fixture>
+static void BM_PumpStreamServerToClient(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_response.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ request_rw->Read(&recv_response, tag(0));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ response_rw.Write(send_response, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ request_rw->Read(&recv_response, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ response_rw.Finish(Status::OK, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
new file mode 100644
index 0000000000..76d278b2a0
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_UnaryPingPong(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ EchoRequest send_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ if (state.range(1) > 0) {
+ send_response.set_message(std::string(state.range(1), 'a'));
+ }
+ Status recv_status;
+ struct ServerEnv {
+ ServerContext ctx;
+ EchoRequest recv_request;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
+ ServerEnv() : response_writer(&ctx) {}
+ };
+ uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
+ ServerEnv* server_env[2] = {
+ reinterpret_cast<ServerEnv*>(server_env_buffer),
+ reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
+ new (server_env[0]) ServerEnv;
+ new (server_env[1]) ServerEnv;
+ service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
+ &server_env[0]->response_writer, fixture->cq(),
+ fixture->cq(), tag(0));
+ service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
+ &server_env[1]->response_writer, fixture->cq(),
+ fixture->cq(), tag(1));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ recv_response.Clear();
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ void* t;
+ bool ok;
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ GPR_ASSERT(t == tag(0) || t == tag(1));
+ intptr_t slot = reinterpret_cast<intptr_t>(t);
+ ServerEnv* senv = server_env[slot];
+ ServerContextMutator svr_ctx_mut(&senv->ctx);
+ senv->response_writer.Finish(send_response, Status::OK, tag(3));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ for (int i = (1 << 3) | (1 << 4); i != 0;) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int tagnum = (int)reinterpret_cast<intptr_t>(t);
+ GPR_ASSERT(i & (1 << tagnum));
+ i -= 1 << tagnum;
+ }
+ GPR_ASSERT(recv_status.ok());
+
+ senv->~ServerEnv();
+ senv = new (senv) ServerEnv();
+ service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
+ fixture->cq(), fixture->cq(), tag(slot));
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ server_env[0]->~ServerEnv();
+ server_env[1]->~ServerEnv();
+ state.SetBytesProcessed(state.range(0) * state.iterations() +
+ state.range(1) * state.iterations());
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 97e2aef914..912c871482 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -141,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
if (!next_issue_) { // ready to issue
RunNextState(true, nullptr);
} else { // wait for the issue time
- alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
}
}
};
@@ -360,8 +361,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
break; // loop around, don't return
case State::WAIT:
next_state_ = State::READY_TO_WRITE;
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
return true;
case State::READY_TO_WRITE:
if (!ok) {
@@ -518,8 +519,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
}
break; // loop around, don't return
case State::WAIT:
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
next_state_ = State::READY_TO_WRITE;
return true;
case State::READY_TO_WRITE:
@@ -760,8 +761,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
break; // loop around, don't return
case State::WAIT:
next_state_ = State::READY_TO_WRITE;
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
return true;
case State::READY_TO_WRITE:
if (!ok) {
diff --git a/tools/codegen/core/gen_stats_data.py b/tools/codegen/core/gen_stats_data.py
index df6cd5a6bd..8e4ef594af 100755
--- a/tools/codegen/core/gen_stats_data.py
+++ b/tools/codegen/core/gen_stats_data.py
@@ -19,13 +19,30 @@ import ctypes
import math
import sys
import yaml
+import json
with open('src/core/lib/debug/stats_data.yaml') as f:
attrs = yaml.load(f.read())
+REQUIRED_FIELDS = ['name', 'doc']
+
+def make_type(name, fields):
+ return (collections.namedtuple(name, ' '.join(list(set(REQUIRED_FIELDS + fields)))), [])
+
+def c_str(s, encoding='ascii'):
+ if isinstance(s, unicode):
+ s = s.encode(encoding)
+ result = ''
+ for c in s:
+ if not (32 <= ord(c) < 127) or c in ('\\', '"'):
+ result += '\\%03o' % ord(c)
+ else:
+ result += c
+ return '"' + result + '"'
+
types = (
- (collections.namedtuple('Counter', 'name'), []),
- (collections.namedtuple('Histogram', 'name max buckets'), []),
+ make_type('Counter', []),
+ make_type('Histogram', ['max', 'buckets']),
)
inst_map = dict((t[0].__name__, t[1]) for t in types)
@@ -200,6 +217,8 @@ with open('src/core/lib/debug/stats_data.h', 'w') as H:
print >>H, "} grpc_stats_%ss;" % (typename.lower())
print >>H, "extern const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT];" % (
typename.lower(), typename.upper())
+ print >>H, "extern const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT];" % (
+ typename.lower(), typename.upper())
histo_start = []
histo_buckets = []
@@ -269,8 +288,14 @@ with open('src/core/lib/debug/stats_data.c', 'w') as C:
print >>C, "const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT] = {" % (
typename.lower(), typename.upper())
for inst in instances:
- print >>C, " \"%s\"," % inst.name
+ print >>C, " %s," % c_str(inst.name)
print >>C, "};"
+ print >>C, "const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT] = {" % (
+ typename.lower(), typename.upper())
+ for inst in instances:
+ print >>C, " %s," % c_str(inst.doc)
+ print >>C, "};"
+
for i, tbl in enumerate(static_tables):
print >>C, "const %s grpc_stats_table_%d[%d] = {%s};" % (
tbl[0], i, len(tbl[1]), ','.join('%s' % x for x in tbl[1]))
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index e4cc1c7461..632735342b 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 4.0.0-dev
+PROJECT_NUMBER = 5.0.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c2daf900fb..e352cb78f1 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 4.0.0-dev
+PROJECT_NUMBER = 5.0.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
@@ -979,10 +979,10 @@ src/core/ext/filters/http/message_compress/message_compress_filter.c \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
src/core/ext/filters/http/server/http_server_filter.c \
src/core/ext/filters/http/server/http_server_filter.h \
-src/core/ext/filters/load_reporting/load_reporting.c \
-src/core/ext/filters/load_reporting/load_reporting.h \
-src/core/ext/filters/load_reporting/load_reporting_filter.c \
-src/core/ext/filters/load_reporting/load_reporting_filter.h \
+src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+src/core/ext/filters/load_reporting/server_load_reporting_filter.h \
+src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
+src/core/ext/filters/load_reporting/server_load_reporting_plugin.h \
src/core/ext/filters/max_age/max_age_filter.c \
src/core/ext/filters/max_age/max_age_filter.h \
src/core/ext/filters/message_size/message_size_filter.c \
diff --git a/tools/flakes/detect_flakes.py b/tools/flakes/detect_flakes.py
index 2aff4c0872..c5c7f61771 100644
--- a/tools/flakes/detect_flakes.py
+++ b/tools/flakes/detect_flakes.py
@@ -33,14 +33,17 @@ sys.path.append(gcp_utils_dir)
import big_query_utils
def print_table(table):
- for i, (k, v) in enumerate(table.items()):
+ kokoro_base_url = 'https://kokoro.corp.google.com/job/'
+ for k, v in table.items():
job_name = v[0]
build_id = v[1]
ts = int(float(v[2]))
# TODO(dgq): timezone handling is wrong. We need to determine the timezone
# of the computer running this script.
human_ts = datetime.datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S PDT')
- print("{}. Test: {}, Timestamp: {}, id: {}@{}\n".format(i, k, human_ts, job_name, build_id))
+ job_path = '{}/{}'.format('/job/'.join(job_name.split('/')), build_id)
+ full_kokoro_url = kokoro_base_url + job_path
+ print("Test: {}, Timestamp: {}, url: {}\n".format(k, human_ts, full_kokoro_url))
def get_flaky_tests(days_lower_bound, days_upper_bound, limit=None):
diff --git a/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg
new file mode 100644
index 0000000000..24e7984f3a
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg
@@ -0,0 +1,30 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+timeout_mins: 20
+action {
+ define_artifacts {
+ regex: "**/*sponge_log.xml"
+ regex: "github/grpc/reports/**"
+ }
+}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f basictests linux sanity opt --inner_jobs 16 -j 1 --internal_ci"
+}
diff --git a/tools/jenkins/run_performance_profile_daily.sh b/tools/jenkins/run_performance_profile_daily.sh
index 26ee87d240..04a2464aee 100755
--- a/tools/jenkins/run_performance_profile_daily.sh
+++ b/tools/jenkins/run_performance_profile_daily.sh
@@ -27,4 +27,6 @@ else
PYTHON=python2.7
fi
-$PYTHON tools/run_tests/run_microbenchmark.py --collect summary perf latency
+BENCHMARKS_TO_RUN="bm_fullstack_unary_ping_pong bm_fullstack_streaming_ping_pong bm_fullstack_streaming_pump bm_closure bm_cq bm_call_create bm_error bm_chttp2_hpack bm_chttp2_transport bm_pollset bm_metadata"
+
+$PYTHON tools/run_tests/run_microbenchmark.py --collect summary perf latency -b $BENCHMARKS_TO_RUN
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index dd9a171268..2f6e34bfb3 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2790,12 +2790,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_streaming_ping_pong",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc",
+ "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
],
"third_party": false,
"type": "target"
@@ -2811,12 +2814,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_streaming_pump",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc",
+ "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
],
"third_party": false,
"type": "target"
@@ -2854,12 +2860,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_unary_ping_pong",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc",
+ "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
],
"third_party": false,
"type": "target"
@@ -5890,7 +5899,6 @@
"grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -5899,6 +5907,7 @@
"grpc_resolver_sockaddr",
"grpc_secure",
"grpc_server_backward_compatibility",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_client_secure",
"grpc_transport_chttp2_server_insecure",
@@ -5920,7 +5929,7 @@
"deps": [
"gpr",
"grpc_base",
- "grpc_load_reporting",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_secure",
"grpc_transport_cronet_client_secure"
],
@@ -5997,7 +6006,6 @@
"grpc_lb_policy_grpclb",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -6005,6 +6013,7 @@
"grpc_resolver_fake",
"grpc_resolver_sockaddr",
"grpc_server_backward_compatibility",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_server_insecure",
"grpc_transport_inproc",
@@ -8565,27 +8574,6 @@
"grpc_base"
],
"headers": [
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h"
- ],
- "is_filegroup": true,
- "language": "c",
- "name": "grpc_load_reporting",
- "src": [
- "src/core/ext/filters/load_reporting/load_reporting.c",
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.c",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h"
- ],
- "third_party": false,
- "type": "filegroup"
- },
- {
- "deps": [
- "gpr",
- "grpc_base"
- ],
- "headers": [
"src/core/ext/filters/max_age/max_age_filter.h"
],
"is_filegroup": true,
@@ -8793,6 +8781,27 @@
{
"deps": [
"gpr",
+ "grpc_base"
+ ],
+ "headers": [
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
+ ],
+ "is_filegroup": true,
+ "language": "c",
+ "name": "grpc_server_load_reporting",
+ "src": [
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
+ ],
+ "third_party": false,
+ "type": "filegroup"
+ },
+ {
+ "deps": [
+ "gpr",
"gpr_test_util",
"grpc_base",
"grpc_client_channel",