aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD12
-rw-r--r--CMakeLists.txt12
-rw-r--r--Makefile52
-rw-r--r--README.md16
-rw-r--r--binding.gyp4
-rw-r--r--build.yaml34
-rw-r--r--build_config.rb2
-rw-r--r--config.m44
-rw-r--r--config.w324
-rw-r--r--gRPC-Core.podspec12
-rw-r--r--grpc.def1
-rw-r--r--grpc.gemspec8
-rw-r--r--grpc.gyp8
-rw-r--r--include/grpc++/alarm.h36
-rw-r--r--include/grpc++/server_builder.h7
-rw-r--r--include/grpc/grpc.h13
-rw-r--r--package.xml8
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.c (renamed from src/core/ext/filters/load_reporting/load_reporting_filter.c)6
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.h (renamed from src/core/ext/filters/load_reporting/load_reporting_filter.h)11
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.c (renamed from src/core/ext/filters/load_reporting/load_reporting.c)17
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_plugin.h (renamed from src/core/ext/filters/load_reporting/load_reporting.h)7
-rw-r--r--src/core/lib/debug/stats_data.c81
-rw-r--r--src/core/lib/debug/stats_data.h14
-rw-r--r--src/core/lib/debug/stats_data.yaml47
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/timer.h4
-rw-r--r--src/core/lib/iomgr/timer_generic.c2
-rw-r--r--src/core/lib/iomgr/timer_uv.c2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c200
-rw-r--r--src/core/lib/security/transport/secure_endpoint.h11
-rw-r--r--src/core/lib/security/transport/security_handshaker.c35
-rw-r--r--src/core/lib/support/string.c3
-rw-r--r--src/core/lib/surface/alarm.c31
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/core/plugin_registry/grpc_cronet_plugin_registry.c8
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c8
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c8
-rw-r--r--src/core/tsi/fake_transport_security.c134
-rw-r--r--src/core/tsi/fake_transport_security.h5
-rw-r--r--src/core/tsi/transport_security_grpc.c25
-rw-r--r--src/core/tsi/transport_security_grpc.h19
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h9
-rw-r--r--test/core/channel/channel_stack_builder_test.c6
-rw-r--r--test/core/end2end/fixtures/h2_load_reporting.c2
-rw-r--r--test/core/end2end/tests/load_reporting_hook.c4
-rw-r--r--test/core/security/secure_endpoint_test.c56
-rw-r--r--test/core/surface/alarm_test.c25
-rw-r--r--test/cpp/common/alarm_cpp_test.cc60
-rw-r--r--test/cpp/microbenchmarks/BUILD25
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc4
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc367
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc147
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc87
-rw-r--r--test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h396
-rw-r--r--test/cpp/microbenchmarks/fullstack_streaming_pump.h170
-rw-r--r--test/cpp/microbenchmarks/fullstack_unary_ping_pong.h116
-rw-r--r--test/cpp/qps/client_async.cc15
-rwxr-xr-xtools/codegen/core/gen_stats_data.py31
-rw-r--r--tools/doxygen/Doxyfile.core2
-rw-r--r--tools/doxygen/Doxyfile.core.internal10
-rw-r--r--tools/internal_ci/linux/grpc_sanity_webhook_test.cfg30
-rw-r--r--tools/run_tests/generated/sources_and_headers.json69
-rwxr-xr-xtools/run_tests/run_tests.py25
65 files changed, 1586 insertions, 991 deletions
diff --git a/BUILD b/BUILD
index bc50a37a40..281375fc1d 100644
--- a/BUILD
+++ b/BUILD
@@ -843,7 +843,7 @@ grpc_cc_library(
"grpc_deadline_filter",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
+ "grpc_server_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -1087,14 +1087,14 @@ grpc_cc_library(
)
grpc_cc_library(
- name = "grpc_load_reporting",
+ name = "grpc_server_load_reporting",
srcs = [
- "src/core/ext/filters/load_reporting/load_reporting.c",
- "src/core/ext/filters/load_reporting/load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c",
],
hdrs = [
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h",
],
language = "c",
deps = [
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 428bdc7773..ba2ec3f38f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1196,8 +1196,8 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/ext/census/base_resources.c
src/core/ext/census/context.c
src/core/ext/census/gen/census.pb.c
@@ -1523,8 +1523,8 @@ add_library(grpc_cronet
src/core/tsi/transport_security.c
src/core/tsi/transport_security_adapter.c
src/core/ext/transport/chttp2/client/chttp2_connector.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/plugin_registry/grpc_cronet_plugin_registry.c
)
@@ -2331,8 +2331,8 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
- src/core/ext/filters/load_reporting/load_reporting.c
- src/core/ext/filters/load_reporting/load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
diff --git a/Makefile b/Makefile
index 66c99f5dcf..4d77cc5038 100644
--- a/Makefile
+++ b/Makefile
@@ -410,7 +410,7 @@ E = @echo
Q = @
endif
-CORE_VERSION = 4.0.0-dev
+CORE_VERSION = 5.0.0-dev
CPP_VERSION = 1.7.0-dev
CSHARP_VERSION = 1.7.0-dev
@@ -460,7 +460,7 @@ SHARED_EXT_CORE = dll
SHARED_EXT_CPP = dll
SHARED_EXT_CSHARP = dll
SHARED_PREFIX =
-SHARED_VERSION_CORE = -4
+SHARED_VERSION_CORE = -5
SHARED_VERSION_CPP = -1
SHARED_VERSION_CSHARP = -1
else ifeq ($(SYSTEM),Darwin)
@@ -2628,7 +2628,7 @@ install-shared_c: shared_c strip-shared_c install-pkg-config_c
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgpr.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.5
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2637,7 +2637,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2646,7 +2646,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_cronet.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)"
@@ -2655,7 +2655,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_unsecure.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2672,7 +2672,7 @@ install-shared_cxx: shared_cxx strip-shared_cxx install-shared_c install-pkg-con
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2681,7 +2681,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_cronet$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_cronet.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2690,7 +2690,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_error_details$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_error_details.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2699,7 +2699,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_reflection$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_reflection.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
@@ -2708,7 +2708,7 @@ endif
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_unsecure.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2725,7 +2725,7 @@ install-shared_csharp: shared_csharp strip-shared_csharp
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext$(SHARED_VERSION_CSHARP)-dll.a $(prefix)/lib/libgrpc_csharp_ext.a
else ifneq ($(SYSTEM),Darwin)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.4
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so
endif
ifneq ($(SYSTEM),MINGW32)
@@ -2892,8 +2892,8 @@ $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGPR_OB
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.4 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.5 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so
endif
endif
@@ -3166,8 +3166,8 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/census/base_resources.c \
src/core/ext/census/context.c \
src/core/ext/census/gen/census.pb.c \
@@ -3261,8 +3261,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGRPC_
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so
endif
endif
@@ -3491,8 +3491,8 @@ LIBGRPC_CRONET_SRC = \
src/core/tsi/transport_security.c \
src/core/tsi/transport_security_adapter.c \
src/core/ext/transport/chttp2/client/chttp2_connector.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/plugin_registry/grpc_cronet_plugin_registry.c \
PUBLIC_HEADERS_C += \
@@ -3557,8 +3557,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(L
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so
endif
endif
@@ -4261,8 +4261,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c \
@@ -4355,8 +4355,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $
ifeq ($(SYSTEM),Darwin)
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
else
- $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
- $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.4
+ $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS)
+ $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.5
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so
endif
endif
diff --git a/README.md b/README.md
index 995f877219..61479f34aa 100644
--- a/README.md
+++ b/README.md
@@ -27,14 +27,14 @@ Libraries in different languages may be in different states of development. We a
| Language | Source | Status |
|-------------------------|-------------------------------------|---------|
-| Shared C [core library] | [src/core](src/core) | 1.0 |
-| C++ | [src/cpp](src/cpp) | 1.0 |
-| Ruby | [src/ruby](src/ruby) | 1.0 |
-| NodeJS | [src/node](src/node) | 1.0 |
-| Python | [src/python](src/python) | 1.0 |
-| PHP | [src/php](src/php) | 1.0 |
-| C# | [src/csharp](src/csharp) | 1.0 |
-| Objective-C | [src/objective-c](src/objective-c) | 1.0 |
+| Shared C [core library] | [src/core](src/core) | 1.6 |
+| C++ | [src/cpp](src/cpp) | 1.6 |
+| Ruby | [src/ruby](src/ruby) | 1.6 |
+| NodeJS | [src/node](src/node) | 1.6 |
+| Python | [src/python](src/python) | 1.6 |
+| PHP | [src/php](src/php) | 1.6 |
+| C# | [src/csharp](src/csharp) | 1.6 |
+| Objective-C | [src/objective-c](src/objective-c) | 1.6 |
Java source code is in the [grpc-java](http://github.com/grpc/grpc-java)
repository. Go source code is in the
diff --git a/binding.gyp b/binding.gyp
index 0110951305..06dc731935 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -893,8 +893,8 @@
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
diff --git a/build.yaml b/build.yaml
index fa013bac13..c694675e93 100644
--- a/build.yaml
+++ b/build.yaml
@@ -12,7 +12,7 @@ settings:
'#08': Use "-preN" suffixes to identify pre-release versions
'#09': Per-language overrides are possible with (eg) ruby_version tag here
'#10': See the expand_version.py for all the quirks here
- core_version: 4.0.0-dev
+ core_version: 5.0.0-dev
g_stands_for: gambit
version: 1.7.0-dev
filegroups:
@@ -590,16 +590,6 @@ filegroups:
uses:
- grpc_base
- grpc_client_channel
-- name: grpc_load_reporting
- headers:
- - src/core/ext/filters/load_reporting/load_reporting.h
- - src/core/ext/filters/load_reporting/load_reporting_filter.h
- src:
- - src/core/ext/filters/load_reporting/load_reporting.c
- - src/core/ext/filters/load_reporting/load_reporting_filter.c
- plugin: grpc_load_reporting_plugin
- uses:
- - grpc_base
- name: grpc_max_age_filter
headers:
- src/core/ext/filters/max_age/max_age_filter.h
@@ -712,6 +702,16 @@ filegroups:
- src/core/ext/filters/workarounds/workaround_utils.c
uses:
- grpc_base
+- name: grpc_server_load_reporting
+ headers:
+ - src/core/ext/filters/load_reporting/server_load_reporting_filter.h
+ - src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
+ src:
+ - src/core/ext/filters/load_reporting/server_load_reporting_filter.c
+ - src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
+ plugin: grpc_server_load_reporting_plugin
+ uses:
+ - grpc_base
- name: grpc_test_util_base
build: test
headers:
@@ -1164,7 +1164,7 @@ libs:
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
- grpc_resolver_fake
- - grpc_load_reporting
+ - grpc_server_load_reporting
- grpc_secure
- census
- grpc_max_age_filter
@@ -1190,7 +1190,7 @@ libs:
- grpc_base
- grpc_transport_cronet_client_secure
- grpc_transport_chttp2_client_secure
- - grpc_load_reporting
+ - grpc_server_load_reporting
generate_plugin_registry: true
platforms:
- linux
@@ -1264,7 +1264,7 @@ libs:
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
- grpc_resolver_fake
- - grpc_load_reporting
+ - grpc_server_load_reporting
- grpc_lb_policy_grpclb
- grpc_lb_policy_pick_first
- grpc_lb_policy_round_robin
@@ -3621,6 +3621,8 @@ targets:
- name: bm_fullstack_streaming_ping_pong
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
src:
- test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
deps:
@@ -3646,6 +3648,8 @@ targets:
- name: bm_fullstack_streaming_pump
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_streaming_pump.h
src:
- test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
deps:
@@ -3697,6 +3701,8 @@ targets:
- name: bm_fullstack_unary_ping_pong
build: test
language: c++
+ headers:
+ - test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
src:
- test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
deps:
diff --git a/build_config.rb b/build_config.rb
index 7159c6e509..3dc31d4ce3 100644
--- a/build_config.rb
+++ b/build_config.rb
@@ -13,5 +13,5 @@
# limitations under the License.
module GrpcBuildConfig
- CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-4.dll'
+ CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-5.dll'
end
diff --git a/config.m4 b/config.m4
index bc9f24b756..d52e37ca28 100644
--- a/config.m4
+++ b/config.m4
@@ -322,8 +322,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
- src/core/ext/filters/load_reporting/load_reporting.c \
- src/core/ext/filters/load_reporting/load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+ src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
src/core/ext/census/base_resources.c \
src/core/ext/census/context.c \
src/core/ext/census/gen/census.pb.c \
diff --git a/config.w32 b/config.w32
index 1b1a82b1ae..92faad7a8f 100644
--- a/config.w32
+++ b/config.w32
@@ -299,8 +299,8 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper_fallback.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native\\dns_resolver.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.c " +
- "src\\core\\ext\\filters\\load_reporting\\load_reporting.c " +
- "src\\core\\ext\\filters\\load_reporting\\load_reporting_filter.c " +
+ "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.c " +
+ "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.c " +
"src\\core\\ext\\census\\base_resources.c " +
"src\\core\\ext\\census\\context.c " +
"src\\core\\ext\\census\\gen\\census.pb.c " +
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index fdab4b3f81..2f1f415866 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -443,8 +443,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
- 'src/core/ext/filters/load_reporting/load_reporting.h',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h',
'src/core/ext/census/aggregation.h',
'src/core/ext/census/base_resources.h',
'src/core/ext/census/census_interface.h',
@@ -701,8 +701,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
@@ -938,8 +938,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
- 'src/core/ext/filters/load_reporting/load_reporting.h',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h',
'src/core/ext/census/aggregation.h',
'src/core/ext/census/base_resources.h',
'src/core/ext/census/census_interface.h',
diff --git a/grpc.def b/grpc.def
index 37cc7e20f0..a7a11601ed 100644
--- a/grpc.def
+++ b/grpc.def
@@ -65,6 +65,7 @@ EXPORTS
grpc_completion_queue_shutdown
grpc_completion_queue_destroy
grpc_alarm_create
+ grpc_alarm_set
grpc_alarm_cancel
grpc_alarm_destroy
grpc_channel_check_connectivity_state
diff --git a/grpc.gemspec b/grpc.gemspec
index c441107643..2d0f9fd450 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -379,8 +379,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting.h )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.h )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.h )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.h )
s.files += %w( src/core/ext/census/aggregation.h )
s.files += %w( src/core/ext/census/base_resources.h )
s.files += %w( src/core/ext/census/census_interface.h )
@@ -640,8 +640,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting.c )
- s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.c )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.c )
+ s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.c )
s.files += %w( src/core/ext/census/base_resources.c )
s.files += %w( src/core/ext/census/context.c )
s.files += %w( src/core/ext/census/gen/census.pb.c )
diff --git a/grpc.gyp b/grpc.gyp
index 0e7ee7e5c9..48fb2ba7cf 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -459,8 +459,8 @@
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
@@ -1108,8 +1108,8 @@
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c',
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h
index ed8dacbc94..2d88d868e5 100644
--- a/include/grpc++/alarm.h
+++ b/include/grpc++/alarm.h
@@ -37,20 +37,33 @@ class CompletionQueue;
/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
class Alarm : private GrpcLibraryCodegen {
public:
- /// Create a completion queue alarm instance associated to \a cq.
- ///
- /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
- /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
- /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ /// Create an unset completion queue alarm
+ Alarm() : tag_(nullptr), alarm_(grpc_alarm_create(nullptr)) {}
+
+ /// DEPRECATED: Create and set a completion queue alarm instance associated to
+ /// \a cq.
+ /// This form is deprecated because it is inherently racy.
/// \internal We rely on the presence of \a cq for grpc initialization. If \a
/// cq were ever to be removed, a reference to a static
/// internal::GrpcLibraryInitializer instance would need to be introduced
/// here. \endinternal.
template <typename T>
Alarm(CompletionQueue* cq, const T& deadline, void* tag)
- : tag_(tag),
- alarm_(grpc_alarm_create(cq->cq(), TimePoint<T>(deadline).raw_time(),
- static_cast<void*>(&tag_))) {}
+ : tag_(tag), alarm_(grpc_alarm_create(nullptr)) {
+ grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(),
+ static_cast<void*>(&tag_), nullptr);
+ }
+
+ /// Trigger an alarm instance on completion queue \a cq at the specified time.
+ /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
+ /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
+ /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ template <typename T>
+ void Set(CompletionQueue* cq, const T& deadline, void* tag) {
+ tag_.Set(tag);
+ grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(),
+ static_cast<void*>(&tag_), nullptr);
+ }
/// Alarms aren't copyable.
Alarm(const Alarm&) = delete;
@@ -69,17 +82,20 @@ class Alarm : private GrpcLibraryCodegen {
/// Destroy the given completion queue alarm, cancelling it in the process.
~Alarm() {
- if (alarm_ != nullptr) grpc_alarm_destroy(alarm_);
+ if (alarm_ != nullptr) grpc_alarm_destroy(alarm_, nullptr);
}
/// Cancel a completion queue alarm. Calling this function over an alarm that
/// has already fired has no effect.
- void Cancel() { grpc_alarm_cancel(alarm_); }
+ void Cancel() {
+ if (alarm_ != nullptr) grpc_alarm_cancel(alarm_, nullptr);
+ }
private:
class AlarmEntry : public CompletionQueueTag {
public:
AlarmEntry(void* tag) : tag_(tag) {}
+ void Set(void* tag) { tag_ = tag; }
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
return true;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index eafd63619d..21ae70d13a 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -151,7 +151,8 @@ class ServerBuilder {
/// Add a completion queue for handling asynchronous services.
///
/// Caller is required to shutdown the server prior to shutting down the
- /// returned completion queue. A typical usage scenario:
+ /// returned completion queue. Caller is also required to drain the
+ /// completion queue after shutting it down. A typical usage scenario:
///
/// // While building the server:
/// ServerBuilder builder;
@@ -162,6 +163,10 @@ class ServerBuilder {
/// // While shutting down the server;
/// server_->Shutdown();
/// cq_->Shutdown(); // Always *after* the associated server's Shutdown()!
+ /// // Drain the cq_ that was created
+ /// void* ignored_tag;
+ /// bool ignored_ok;
+ /// while (cq_->Next(&ignored_tag, &ignored_ok)) { }
///
/// \param is_frequently_polled This is an optional parameter to inform gRPC
/// library about whether this completion queue would be frequently polled
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index b562167b5f..fab7d438aa 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -143,21 +143,24 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
drained and no threads are executing grpc_completion_queue_next */
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq);
-/** Create a completion queue alarm instance associated to \a cq.
+/** Create a completion queue alarm instance */
+GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved);
+
+/** Set a completion queue alarm instance associated to \a cq.
*
* Once the alarm expires (at \a deadline) or it's cancelled (see \a
* grpc_alarm_cancel), an event with tag \a tag will be added to \a cq. If the
* alarm expired, the event's success bit will be true, false otherwise (ie,
* upon cancellation). */
-GRPCAPI grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq,
- gpr_timespec deadline, void *tag);
+GRPCAPI void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag, void *reserved);
/** Cancel a completion queue alarm. Calling this function over an alarm that
* has already fired has no effect. */
-GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm);
+GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved);
/** Destroy the given completion queue alarm, cancelling it in the process. */
-GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm);
+GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved);
/** Check the connectivity state of a channel. */
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(
diff --git a/package.xml b/package.xml
index 4b1f42dd02..b7c0e679e5 100644
--- a/package.xml
+++ b/package.xml
@@ -389,8 +389,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.h" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/aggregation.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/base_resources.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/census_interface.h" role="src" />
@@ -650,8 +650,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.c" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/base_resources.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/context.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/gen/census.pb.c" role="src" />
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c
index 17e946937f..7b8cf986f7 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c
@@ -24,8 +24,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -213,7 +213,7 @@ static void lr_start_transport_stream_op_batch(
GPR_TIMER_END("lr_start_transport_stream_op_batch", 0);
}
-const grpc_channel_filter grpc_load_reporting_filter = {
+const grpc_channel_filter grpc_server_load_reporting_filter = {
lr_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
index 1a5424e43a..9527868c9f 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h
@@ -16,12 +16,13 @@
*
*/
-#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
-#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_stack.h"
-extern const grpc_channel_filter grpc_load_reporting_filter;
+extern const grpc_channel_filter grpc_server_load_reporting_filter;
-#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \
+ */
diff --git a/src/core/ext/filters/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
index b42aa99cdb..199cb883b3 100644
--- a/src/core/ext/filters/load_reporting/load_reporting.c
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c
@@ -25,8 +25,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
@@ -37,9 +37,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) {
grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
}
-static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg) {
+static bool maybe_add_server_load_reporting_filter(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_channel_filter *filter = arg;
@@ -61,10 +60,10 @@ grpc_arg grpc_load_reporting_enable_arg() {
/* Plugin registration */
-void grpc_load_reporting_plugin_init(void) {
+void grpc_server_load_reporting_plugin_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_load_reporting_filter,
- (void *)&grpc_load_reporting_filter);
+ maybe_add_server_load_reporting_filter,
+ (void *)&grpc_server_load_reporting_filter);
}
-void grpc_load_reporting_plugin_shutdown() {}
+void grpc_server_load_reporting_plugin_shutdown() {}
diff --git a/src/core/ext/filters/load_reporting/load_reporting.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
index fc04d2826a..65a6d0900e 100644
--- a/src/core/ext/filters/load_reporting/load_reporting.h
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
-#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H
+#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
+#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H
#include <grpc/impl/codegen/grpc_types.h>
@@ -55,4 +55,5 @@ typedef struct grpc_load_reporting_call_data {
/** Return a \a grpc_arg enabling load reporting */
grpc_arg grpc_load_reporting_enable_arg();
-#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H */
+#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \
+ */
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index 81ef0d28ee..969cac9d89 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -55,10 +55,59 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_wakeup_initiated",
"executor_queue_drained",
};
+const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
+ "Number of client side calls created by this process",
+ "Number of server side calls created by this process",
+ "Number of polling syscalls (epoll_wait, poll, etc) made by this process",
+ "Number of sleeping syscalls made by this process",
+ "How many polling wakeups were performed by the process",
+ "How many times was a polling wakeup requested without an active poller",
+ "How many times was the same polling worker awoken repeatedly before "
+ "waking up",
+ "How many times was an eventfd used as the wakeup vector for a polling "
+ "wakeup",
+ "How many times was a condition variable used as the wakeup vector for a "
+ "polling wakeup",
+ "How many times could a polling wakeup be satisfied by keeping the waking "
+ "thread awake?",
+ "Number of times histogram increments went through the slow (binary "
+ "search) path",
+ "Number of write syscalls (or equivalent - eg sendmsg) made by this "
+ "process",
+ "Number of read syscalls (or equivalent - eg recvmsg) made by this process",
+ "Number of batches received by HTTP2 transport",
+ "Number of cancelations received by HTTP2 transport",
+ "Number of batches containing send initial metadata",
+ "Number of batches containing send message",
+ "Number of batches containing send trailing metadata",
+ "Number of batches containing receive initial metadata",
+ "Number of batches containing receive message",
+ "Number of batches containing receive trailing metadata",
+ "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated",
+ "Number of combiner lock entries by process (first items queued to a "
+ "combiner)",
+ "Number of items scheduled against combiner locks",
+ "Number of final items scheduled against combiner locks",
+ "Number of combiner locks offloaded to different threads",
+ "Number of closures scheduled against the executor (gRPC thread pool)",
+ "Number of closures scheduled by the executor to the executor",
+ "Number of thread wakeups initiated within the executor",
+ "Number of times an executor queue was drained",
+};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
- "call_initial_size", "poll_events_returned", "tcp_write_size",
- "tcp_write_iov_size", "tcp_read_size", "tcp_read_offer",
- "tcp_read_iov_size", "http2_send_message_size",
+ "call_initial_size", "poll_events_returned", "tcp_write_size",
+ "tcp_write_iov_size", "tcp_read_size", "tcp_read_offer",
+ "tcp_read_offer_iov_size", "http2_send_message_size",
+};
+const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
+ "Initial size of the grpc_call arena created at call start",
+ "How many events are called for each syscall_poll",
+ "Number of bytes offered to each syscall_write",
+ "Number of byte segments offered to each syscall_write",
+ "Number of bytes received by each syscall_read",
+ "Number of bytes offered to each syscall_read",
+ "Number of byte segments offered to each syscall_read",
+ "Size of messages received by HTTP2 transport",
};
const int grpc_stats_table_0[65] = {
0, 1, 2, 3, 4, 5, 7, 9, 11, 14,
@@ -276,11 +325,12 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_4, 64));
}
-void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
+void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx,
+ int value) {
value = GPR_CLAMP(value, 0, 1024);
if (value < 13) {
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
- value);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, value);
return;
}
union {
@@ -293,11 +343,12 @@ void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
_bkt.dbl = grpc_stats_table_6[bucket];
bucket -= (_val.uint < _bkt.uint);
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
- bucket);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, bucket);
return;
}
- GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
+ GRPC_STATS_INC_HISTOGRAM((exec_ctx),
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE,
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_6, 64));
}
@@ -335,7 +386,11 @@ const int *const grpc_stats_histo_bucket_boundaries[8] = {
grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_4,
grpc_stats_table_6, grpc_stats_table_4};
void (*const grpc_stats_inc_histogram[8])(grpc_exec_ctx *exec_ctx, int x) = {
- grpc_stats_inc_call_initial_size, grpc_stats_inc_poll_events_returned,
- grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size,
- grpc_stats_inc_tcp_read_size, grpc_stats_inc_tcp_read_offer,
- grpc_stats_inc_tcp_read_iov_size, grpc_stats_inc_http2_send_message_size};
+ grpc_stats_inc_call_initial_size,
+ grpc_stats_inc_poll_events_returned,
+ grpc_stats_inc_tcp_write_size,
+ grpc_stats_inc_tcp_write_iov_size,
+ grpc_stats_inc_tcp_read_size,
+ grpc_stats_inc_tcp_read_offer,
+ grpc_stats_inc_tcp_read_offer_iov_size,
+ grpc_stats_inc_http2_send_message_size};
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index e6a2256172..27f0aa8ba2 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -59,6 +59,7 @@ typedef enum {
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
+extern const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT];
typedef enum {
GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE,
GRPC_STATS_HISTOGRAM_POLL_EVENTS_RETURNED,
@@ -66,11 +67,12 @@ typedef enum {
GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE,
GRPC_STATS_HISTOGRAM_TCP_READ_SIZE,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE,
GRPC_STATS_HISTOGRAM_COUNT
} grpc_stats_histograms;
extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT];
+extern const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT];
typedef enum {
GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE_FIRST_SLOT = 0,
GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE_BUCKETS = 64,
@@ -84,8 +86,8 @@ typedef enum {
GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_FIRST_SLOT = 384,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_BUCKETS = 64,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_FIRST_SLOT = 448,
- GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_FIRST_SLOT = 448,
+ GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 512,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_BUCKETS = 576
@@ -182,9 +184,9 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int x);
#define GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, value) \
grpc_stats_inc_tcp_read_offer((exec_ctx), (int)(value))
void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int x);
-#define GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, value) \
- grpc_stats_inc_tcp_read_iov_size((exec_ctx), (int)(value))
-void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int x);
+#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, value) \
+ grpc_stats_inc_tcp_read_offer_iov_size((exec_ctx), (int)(value))
+void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x);
#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \
grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value))
void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x);
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 4e0bc5de58..ddf1294a78 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -17,63 +17,108 @@
# overall
- counter: client_calls_created
+ doc: Number of client side calls created by this process
- counter: server_calls_created
+ doc: Number of server side calls created by this process
- histogram: call_initial_size
max: 262144
buckets: 64
+ doc: Initial size of the grpc_call arena created at call start
# polling
- counter: syscall_poll
+ doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process
- counter: syscall_wait
+ doc: Number of sleeping syscalls made by this process
- histogram: poll_events_returned
max: 1024
buckets: 128
+ doc: How many events are called for each syscall_poll
- counter: pollset_kick
+ doc: How many polling wakeups were performed by the process
- counter: pollset_kicked_without_poller
+ doc: How many times was a polling wakeup requested without an active poller
- counter: pollset_kicked_again
+ doc: How many times was the same polling worker awoken repeatedly before
+ waking up
- counter: pollset_kick_wakeup_fd
+ doc: How many times was an eventfd used as the wakeup vector for a polling
+ wakeup
- counter: pollset_kick_wakeup_cv
+ doc: How many times was a condition variable used as the wakeup vector for a
+ polling wakeup
- counter: pollset_kick_own_thread
+ doc: How many times could a polling wakeup be satisfied by keeping the waking
+ thread awake?
# stats system
- counter: histogram_slow_lookups
+ doc: Number of times histogram increments went through the slow
+ (binary search) path
# tcp
- counter: syscall_write
+ doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process
- counter: syscall_read
+ doc: Number of read syscalls (or equivalent - eg recvmsg) made by this process
- histogram: tcp_write_size
max: 16777216 # 16 meg max write tracked
buckets: 64
+ doc: Number of bytes offered to each syscall_write
- histogram: tcp_write_iov_size
max: 1024
buckets: 64
+ doc: Number of byte segments offered to each syscall_write
- histogram: tcp_read_size
max: 16777216
buckets: 64
+ doc: Number of bytes received by each syscall_read
- histogram: tcp_read_offer
max: 16777216
buckets: 64
-- histogram: tcp_read_iov_size
+ doc: Number of bytes offered to each syscall_read
+- histogram: tcp_read_offer_iov_size
max: 1024
buckets: 64
+ doc: Number of byte segments offered to each syscall_read
# chttp2
- counter: http2_op_batches
+ doc: Number of batches received by HTTP2 transport
- counter: http2_op_cancel
+ doc: Number of cancelations received by HTTP2 transport
- counter: http2_op_send_initial_metadata
+ doc: Number of batches containing send initial metadata
- counter: http2_op_send_message
+ doc: Number of batches containing send message
- counter: http2_op_send_trailing_metadata
+ doc: Number of batches containing send trailing metadata
- counter: http2_op_recv_initial_metadata
+ doc: Number of batches containing receive initial metadata
- counter: http2_op_recv_message
+ doc: Number of batches containing receive message
- counter: http2_op_recv_trailing_metadata
+ doc: Number of batches containing receive trailing metadata
- histogram: http2_send_message_size
max: 16777216
buckets: 64
+ doc: Size of messages received by HTTP2 transport
- counter: http2_pings_sent
+ doc: Number of HTTP2 pings sent by process
- counter: http2_writes_begun
+ doc: Number of HTTP2 writes initiated
# combiner locks
- counter: combiner_locks_initiated
+ doc: Number of combiner lock entries by process
+ (first items queued to a combiner)
- counter: combiner_locks_scheduled_items
+ doc: Number of items scheduled against combiner locks
- counter: combiner_locks_scheduled_final_items
+ doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
+ doc: Number of combiner locks offloaded to different threads
# executor
- counter: executor_scheduled_items
+ doc: Number of closures scheduled against the executor (gRPC thread pool)
- counter: executor_scheduled_to_self
+ doc: Number of closures scheduled by the executor to the executor
- counter: executor_wakeup_initiated
+ doc: Number of thread wakeups initiated within the executor
- counter: executor_queue_drained
+ doc: Number of times an executor queue was drained
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 98a2afd78b..3372e14eef 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -256,7 +256,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_flags = 0;
GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length);
- GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count);
+ GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count);
GPR_TIMER_BEGIN("recvmsg", 0);
do {
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index b92b8fb8b8..ac392f87fe 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -44,6 +44,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now);
+/* Initialize *timer without setting it. This can later be passed through
+ the regular init or cancel */
+void grpc_timer_init_unset(grpc_timer *timer);
+
/* Note that there is no timer destroy function. This is because the
timer is a one-time occurrence with a guarantee that the callback will
be called exactly once, either at expiration or cancellation. Thus, all
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 12efce241f..c08bb525b7 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -234,6 +234,8 @@ static void note_deadline_change(timer_shard *shard) {
}
}
+void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; }
+
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 70f49bcbe8..adced41f53 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -77,6 +77,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
uv_unref((uv_handle_t *)uv_timer);
}
+void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = 0; }
+
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
GRPC_UV_ASSERT_SAME_THREAD();
if (timer->pending) {
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 5e41b94ff8..ae5633b82c 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -34,7 +34,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-#include "src/core/tsi/transport_security_interface.h"
+#include "src/core/tsi/transport_security_grpc.h"
#define STAGING_BUFFER_SIZE 8192
@@ -42,6 +42,7 @@ typedef struct {
grpc_endpoint base;
grpc_endpoint *wrapped_ep;
struct tsi_frame_protector *protector;
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
grpc_closure *read_cb;
@@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
+ tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector);
grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes);
grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer);
grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer);
@@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
return;
}
- /* TODO(yangg) check error, maybe bail out early */
- for (i = 0; i < ep->source_buffer.count; i++) {
- grpc_slice encrypted = ep->source_buffer.slices[i];
- uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
- size_t message_size = GRPC_SLICE_LENGTH(encrypted);
-
- while (message_size > 0 || keep_looping) {
- size_t unprotected_buffer_size_written = (size_t)(end - cur);
- size_t processed_message_size = message_size;
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_unprotect(ep->protector, message_bytes,
- &processed_message_size, cur,
- &unprotected_buffer_size_written);
- gpr_mu_unlock(&ep->protector_mu);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Decryption error: %s",
- tsi_result_to_string(result));
- break;
- }
- message_bytes += processed_message_size;
- message_size -= processed_message_size;
- cur += unprotected_buffer_size_written;
-
- if (cur == end) {
- flush_read_staging_buffer(ep, &cur, &end);
- /* Force to enter the loop again to extract buffered bytes in protector.
- The bytes could be buffered because of running out of staging_buffer.
- If this happens at the end of all slices, doing another unprotect
- avoids leaving data in the protector. */
- keep_looping = 1;
- } else if (unprotected_buffer_size_written > 0) {
- keep_looping = 1;
- } else {
- keep_looping = 0;
+ if (ep->zero_copy_protector != NULL) {
+ // Use zero-copy grpc protector to unprotect.
+ result = tsi_zero_copy_grpc_protector_unprotect(
+ exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
+ } else {
+ // Use frame protector to unprotect.
+ /* TODO(yangg) check error, maybe bail out early */
+ for (i = 0; i < ep->source_buffer.count; i++) {
+ grpc_slice encrypted = ep->source_buffer.slices[i];
+ uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
+ size_t message_size = GRPC_SLICE_LENGTH(encrypted);
+
+ while (message_size > 0 || keep_looping) {
+ size_t unprotected_buffer_size_written = (size_t)(end - cur);
+ size_t processed_message_size = message_size;
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_unprotect(
+ ep->protector, message_bytes, &processed_message_size, cur,
+ &unprotected_buffer_size_written);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Decryption error: %s",
+ tsi_result_to_string(result));
+ break;
+ }
+ message_bytes += processed_message_size;
+ message_size -= processed_message_size;
+ cur += unprotected_buffer_size_written;
+
+ if (cur == end) {
+ flush_read_staging_buffer(ep, &cur, &end);
+ /* Force to enter the loop again to extract buffered bytes in
+ protector. The bytes could be buffered because of running out of
+ staging_buffer. If this happens at the end of all slices, doing
+ another unprotect avoids leaving data in the protector. */
+ keep_looping = 1;
+ } else if (unprotected_buffer_size_written > 0) {
+ keep_looping = 1;
+ } else {
+ keep_looping = 0;
+ }
}
+ if (result != TSI_OK) break;
}
- if (result != TSI_OK) break;
- }
- if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
- grpc_slice_buffer_add(
- ep->read_buffer,
- grpc_slice_split_head(
- &ep->read_staging_buffer,
- (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
+ if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
+ grpc_slice_buffer_add(
+ ep->read_buffer,
+ grpc_slice_split_head(
+ &ep->read_staging_buffer,
+ (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
+ }
}
/* TODO(yangg) experiment with moving this block after read_cb to see if it
@@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
}
}
- for (i = 0; i < slices->count; i++) {
- grpc_slice plain = slices->slices[i];
- uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
- size_t message_size = GRPC_SLICE_LENGTH(plain);
- while (message_size > 0) {
- size_t protected_buffer_size_to_send = (size_t)(end - cur);
- size_t processed_message_size = message_size;
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_protect(ep->protector, message_bytes,
- &processed_message_size, cur,
- &protected_buffer_size_to_send);
- gpr_mu_unlock(&ep->protector_mu);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Encryption error: %s",
- tsi_result_to_string(result));
- break;
- }
- message_bytes += processed_message_size;
- message_size -= processed_message_size;
- cur += protected_buffer_size_to_send;
-
- if (cur == end) {
- flush_write_staging_buffer(ep, &cur, &end);
+ if (ep->zero_copy_protector != NULL) {
+ // Use zero-copy grpc protector to protect.
+ result = tsi_zero_copy_grpc_protector_protect(
+ exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer);
+ } else {
+ // Use frame protector to protect.
+ for (i = 0; i < slices->count; i++) {
+ grpc_slice plain = slices->slices[i];
+ uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
+ size_t message_size = GRPC_SLICE_LENGTH(plain);
+ while (message_size > 0) {
+ size_t protected_buffer_size_to_send = (size_t)(end - cur);
+ size_t processed_message_size = message_size;
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_protect(ep->protector, message_bytes,
+ &processed_message_size, cur,
+ &protected_buffer_size_to_send);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Encryption error: %s",
+ tsi_result_to_string(result));
+ break;
+ }
+ message_bytes += processed_message_size;
+ message_size -= processed_message_size;
+ cur += protected_buffer_size_to_send;
+
+ if (cur == end) {
+ flush_write_staging_buffer(ep, &cur, &end);
+ }
}
- }
- if (result != TSI_OK) break;
- }
- if (result == TSI_OK) {
- size_t still_pending_size;
- do {
- size_t protected_buffer_size_to_send = (size_t)(end - cur);
- gpr_mu_lock(&ep->protector_mu);
- result = tsi_frame_protector_protect_flush(ep->protector, cur,
- &protected_buffer_size_to_send,
- &still_pending_size);
- gpr_mu_unlock(&ep->protector_mu);
if (result != TSI_OK) break;
- cur += protected_buffer_size_to_send;
- if (cur == end) {
- flush_write_staging_buffer(ep, &cur, &end);
+ }
+ if (result == TSI_OK) {
+ size_t still_pending_size;
+ do {
+ size_t protected_buffer_size_to_send = (size_t)(end - cur);
+ gpr_mu_lock(&ep->protector_mu);
+ result = tsi_frame_protector_protect_flush(
+ ep->protector, cur, &protected_buffer_size_to_send,
+ &still_pending_size);
+ gpr_mu_unlock(&ep->protector_mu);
+ if (result != TSI_OK) break;
+ cur += protected_buffer_size_to_send;
+ if (cur == end) {
+ flush_write_staging_buffer(ep, &cur, &end);
+ }
+ } while (still_pending_size > 0);
+ if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
+ grpc_slice_buffer_add(
+ &ep->output_buffer,
+ grpc_slice_split_head(
+ &ep->write_staging_buffer,
+ (size_t)(cur -
+ GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
}
- } while (still_pending_size > 0);
- if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
- grpc_slice_buffer_add(
- &ep->output_buffer,
- grpc_slice_split_head(
- &ep->write_staging_buffer,
- (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
}
}
@@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_get_fd};
grpc_endpoint *grpc_secure_endpoint_create(
- struct tsi_frame_protector *protector, grpc_endpoint *transport,
- grpc_slice *leftover_slices, size_t leftover_nslices) {
+ struct tsi_frame_protector *protector,
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector,
+ grpc_endpoint *transport, grpc_slice *leftover_slices,
+ size_t leftover_nslices) {
size_t i;
secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint));
ep->base.vtable = &vtable;
ep->wrapped_ep = transport;
ep->protector = protector;
+ ep->zero_copy_protector = zero_copy_protector;
grpc_slice_buffer_init(&ep->leftover_bytes);
for (i = 0; i < leftover_nslices; i++) {
grpc_slice_buffer_add(&ep->leftover_bytes,
diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h
index 1c5555f3df..3323a6ff42 100644
--- a/src/core/lib/security/transport/secure_endpoint.h
+++ b/src/core/lib/security/transport/secure_endpoint.h
@@ -23,12 +23,17 @@
#include "src/core/lib/iomgr/endpoint.h"
struct tsi_frame_protector;
+struct tsi_zero_copy_grpc_protector;
extern grpc_tracer_flag grpc_trace_secure_endpoint;
-/* Takes ownership of protector and to_wrap, and refs leftover_slices. */
+/* Takes ownership of protector, zero_copy_protector, and to_wrap, and refs
+ * leftover_slices. If zero_copy_protector is not NULL, protector will never be
+ * used. */
grpc_endpoint *grpc_secure_endpoint_create(
- struct tsi_frame_protector *protector, grpc_endpoint *to_wrap,
- grpc_slice *leftover_slices, size_t leftover_nslices);
+ struct tsi_frame_protector *protector,
+ struct tsi_zero_copy_grpc_protector *zero_copy_protector,
+ grpc_endpoint *to_wrap, grpc_slice *leftover_slices,
+ size_t leftover_nslices);
#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index fc9c9f980f..ea9608f444 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -32,6 +32,7 @@
#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/tsi/transport_security_grpc.h"
#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
@@ -135,17 +136,31 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
goto done;
}
- // Create frame protector.
- tsi_frame_protector *protector;
- tsi_result result = tsi_handshaker_result_create_frame_protector(
- h->handshaker_result, NULL, &protector);
- if (result != TSI_OK) {
+ // Create zero-copy frame protector, if implemented.
+ tsi_zero_copy_grpc_protector *zero_copy_protector = NULL;
+ tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector(
+ h->handshaker_result, NULL, &zero_copy_protector);
+ if (result != TSI_OK && result != TSI_UNIMPLEMENTED) {
error = grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"),
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Zero-copy frame protector creation failed"),
result);
security_handshake_failed_locked(exec_ctx, h, error);
goto done;
}
+ // Create frame protector if zero-copy frame protector is NULL.
+ tsi_frame_protector *protector = NULL;
+ if (zero_copy_protector == NULL) {
+ result = tsi_handshaker_result_create_frame_protector(h->handshaker_result,
+ NULL, &protector);
+ if (result != TSI_OK) {
+ error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Frame protector creation failed"),
+ result);
+ security_handshake_failed_locked(exec_ctx, h, error);
+ goto done;
+ }
+ }
// Get unused bytes.
const unsigned char *unused_bytes = NULL;
size_t unused_bytes_size = 0;
@@ -155,12 +170,12 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
if (unused_bytes_size > 0) {
grpc_slice slice =
grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size);
- h->args->endpoint =
- grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1);
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, zero_copy_protector, h->args->endpoint, &slice, 1);
grpc_slice_unref_internal(exec_ctx, slice);
} else {
- h->args->endpoint =
- grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0);
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, zero_copy_protector, h->args->endpoint, NULL, 0);
}
tsi_handshaker_result_destroy(h->handshaker_result);
h->handshaker_result = NULL;
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index ec93303024..523e43445b 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -300,11 +300,12 @@ void *gpr_memrchr(const void *s, int c, size_t n) {
}
bool gpr_is_true(const char *s) {
+ size_t i;
if (s == NULL) {
return false;
}
static const char *truthy[] = {"yes", "true", "1"};
- for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ for (i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == gpr_stricmp(s, truthy[i])) {
return true;
}
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 7d60b1de17..5dbfaa2d43 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
static void alarm_unref(grpc_alarm *alarm) {
if (gpr_unref(&alarm->refs)) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ if (alarm->cq != NULL) {
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ }
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(alarm);
}
@@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
(void *)alarm, &alarm->completion);
}
-grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
- void *tag) {
+grpc_alarm *grpc_alarm_create(void *reserved) {
grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
- gpr_ref_init(&alarm->refs, 1);
#ifndef NDEBUG
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
@@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
}
#endif
+ gpr_ref_init(&alarm->refs, 1);
+ grpc_timer_init_unset(&alarm->alarm);
+ alarm->cq = NULL;
+ GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
+ grpc_schedule_on_exec_ctx);
+ return alarm;
+}
+
+void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag, void *reserved) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
- GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
- grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx_finish(&exec_ctx);
- return alarm;
}
-void grpc_alarm_cancel(grpc_alarm *alarm) {
+void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_timer_cancel(&exec_ctx, &alarm->alarm);
grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_alarm_destroy(grpc_alarm *alarm) {
- grpc_alarm_cancel(alarm);
+void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) {
+ grpc_alarm_cancel(alarm, reserved);
GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 96c16105e7..fd6ea4daa9 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -21,6 +21,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "4.0.0-dev"; }
+const char *grpc_version_string(void) { return "5.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gambit"; }
diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
index 322ebea111..1c09f54ad9 100644
--- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
@@ -28,8 +28,8 @@ extern void grpc_client_channel_init(void);
extern void grpc_client_channel_shutdown(void);
extern void grpc_tsi_gts_init(void);
extern void grpc_tsi_gts_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_http_filters_init,
@@ -42,6 +42,6 @@ void grpc_register_built_in_plugins(void) {
grpc_client_channel_shutdown);
grpc_register_plugin(grpc_tsi_gts_init,
grpc_tsi_gts_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
}
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index fa9974952c..9cacf3d306 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -44,8 +44,8 @@ extern void grpc_resolver_dns_native_init(void);
extern void grpc_resolver_dns_native_shutdown(void);
extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
extern void census_grpc_plugin_init(void);
extern void census_grpc_plugin_shutdown(void);
extern void grpc_max_age_filter_init(void);
@@ -82,8 +82,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(census_grpc_plugin_init,
census_grpc_plugin_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index 7eb599d81a..7b90d796d5 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -36,8 +36,8 @@ extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
extern void grpc_resolver_fake_init(void);
extern void grpc_resolver_fake_shutdown(void);
-extern void grpc_load_reporting_plugin_init(void);
-extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_server_load_reporting_plugin_init(void);
+extern void grpc_server_load_reporting_plugin_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
@@ -72,8 +72,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_resolver_fake_init,
grpc_resolver_fake_shutdown);
- grpc_register_plugin(grpc_load_reporting_plugin_init,
- grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_server_load_reporting_plugin_init,
+ grpc_server_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 967126ecee..e7b3be3d86 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -25,7 +25,8 @@
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/useful.h>
-#include "src/core/tsi/transport_security.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/tsi/transport_security_grpc.h"
/* --- Constants. ---*/
#define TSI_FAKE_FRAME_HEADER_SIZE 4
@@ -74,6 +75,14 @@ typedef struct {
size_t max_frame_size;
} tsi_fake_frame_protector;
+typedef struct {
+ tsi_zero_copy_grpc_protector base;
+ grpc_slice_buffer header_sb;
+ grpc_slice_buffer protected_sb;
+ size_t max_frame_size;
+ size_t parsed_frame_size;
+} tsi_fake_zero_copy_grpc_protector;
+
/* --- Utils. ---*/
static const char *tsi_fake_handshake_message_strings[] = {
@@ -113,6 +122,28 @@ static void store32_little_endian(uint32_t value, unsigned char *buf) {
buf[0] = (unsigned char)((value)&0xFF);
}
+static uint32_t read_frame_size(const grpc_slice_buffer *sb) {
+ GPR_ASSERT(sb != NULL && sb->length >= TSI_FAKE_FRAME_HEADER_SIZE);
+ uint8_t frame_size_buffer[TSI_FAKE_FRAME_HEADER_SIZE];
+ uint8_t *buf = frame_size_buffer;
+ /* Copies the first 4 bytes to a temporary buffer. */
+ size_t remaining = TSI_FAKE_FRAME_HEADER_SIZE;
+ for (size_t i = 0; i < sb->count; i++) {
+ size_t slice_length = GRPC_SLICE_LENGTH(sb->slices[i]);
+ if (remaining <= slice_length) {
+ memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), remaining);
+ remaining = 0;
+ break;
+ } else {
+ memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), slice_length);
+ buf += slice_length;
+ remaining -= slice_length;
+ }
+ }
+ GPR_ASSERT(remaining == 0);
+ return load32_little_endian(frame_size_buffer);
+}
+
static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) {
frame->offset = 0;
frame->needs_draining = needs_draining;
@@ -363,6 +394,84 @@ static const tsi_frame_protector_vtable frame_protector_vtable = {
fake_protector_unprotect, fake_protector_destroy,
};
+/* --- tsi_zero_copy_grpc_protector methods implementation. ---*/
+
+static tsi_result fake_zero_copy_grpc_protector_protect(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices,
+ grpc_slice_buffer *protected_slices) {
+ if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ /* Protects each frame. */
+ while (unprotected_slices->length > 0) {
+ size_t frame_length =
+ GPR_MIN(impl->max_frame_size,
+ unprotected_slices->length + TSI_FAKE_FRAME_HEADER_SIZE);
+ grpc_slice slice = GRPC_SLICE_MALLOC(TSI_FAKE_FRAME_HEADER_SIZE);
+ store32_little_endian((uint32_t)frame_length, GRPC_SLICE_START_PTR(slice));
+ grpc_slice_buffer_add(protected_slices, slice);
+ size_t data_length = frame_length - TSI_FAKE_FRAME_HEADER_SIZE;
+ grpc_slice_buffer_move_first(unprotected_slices, data_length,
+ protected_slices);
+ }
+ return TSI_OK;
+}
+
+static tsi_result fake_zero_copy_grpc_protector_unprotect(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices,
+ grpc_slice_buffer *unprotected_slices) {
+ if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ grpc_slice_buffer_move_into(protected_slices, &impl->protected_sb);
+ /* Unprotect each frame, if we get a full frame. */
+ while (impl->protected_sb.length >= TSI_FAKE_FRAME_HEADER_SIZE) {
+ if (impl->parsed_frame_size == 0) {
+ impl->parsed_frame_size = read_frame_size(&impl->protected_sb);
+ if (impl->parsed_frame_size <= 4) {
+ gpr_log(GPR_ERROR, "Invalid frame size.");
+ return TSI_DATA_CORRUPTED;
+ }
+ }
+ /* If we do not have a full frame, return with OK status. */
+ if (impl->protected_sb.length < impl->parsed_frame_size) break;
+ /* Strips header bytes. */
+ grpc_slice_buffer_move_first(&impl->protected_sb,
+ TSI_FAKE_FRAME_HEADER_SIZE, &impl->header_sb);
+ /* Moves data to unprotected slices. */
+ grpc_slice_buffer_move_first(
+ &impl->protected_sb,
+ impl->parsed_frame_size - TSI_FAKE_FRAME_HEADER_SIZE,
+ unprotected_slices);
+ impl->parsed_frame_size = 0;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &impl->header_sb);
+ }
+ return TSI_OK;
+}
+
+static void fake_zero_copy_grpc_protector_destroy(
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self) {
+ if (self == NULL) return;
+ tsi_fake_zero_copy_grpc_protector *impl =
+ (tsi_fake_zero_copy_grpc_protector *)self;
+ grpc_slice_buffer_destroy_internal(exec_ctx, &impl->header_sb);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &impl->protected_sb);
+ gpr_free(impl);
+}
+
+static const tsi_zero_copy_grpc_protector_vtable
+ zero_copy_grpc_protector_vtable = {
+ fake_zero_copy_grpc_protector_protect,
+ fake_zero_copy_grpc_protector_unprotect,
+ fake_zero_copy_grpc_protector_destroy,
+};
+
/* --- tsi_handshaker_result methods implementation. ---*/
typedef struct {
@@ -383,6 +492,14 @@ static tsi_result fake_handshaker_result_extract_peer(
return result;
}
+static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_zero_copy_grpc_protector **protector) {
+ *protector =
+ tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size);
+ return TSI_OK;
+}
+
static tsi_result fake_handshaker_result_create_frame_protector(
const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
tsi_frame_protector **protector) {
@@ -407,7 +524,7 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) {
static const tsi_handshaker_result_vtable handshaker_result_vtable = {
fake_handshaker_result_extract_peer,
- NULL, /* create_zero_copy_grpc_protector */
+ fake_handshaker_result_create_zero_copy_grpc_protector,
fake_handshaker_result_create_frame_protector,
fake_handshaker_result_get_unused_bytes,
fake_handshaker_result_destroy,
@@ -631,3 +748,16 @@ tsi_frame_protector *tsi_create_fake_frame_protector(
impl->base.vtable = &frame_protector_vtable;
return &impl->base;
}
+
+tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector(
+ size_t *max_protected_frame_size) {
+ tsi_fake_zero_copy_grpc_protector *impl = gpr_zalloc(sizeof(*impl));
+ grpc_slice_buffer_init(&impl->header_sb);
+ grpc_slice_buffer_init(&impl->protected_sb);
+ impl->max_frame_size = (max_protected_frame_size == NULL)
+ ? TSI_FAKE_DEFAULT_FRAME_SIZE
+ : *max_protected_frame_size;
+ impl->parsed_frame_size = 0;
+ impl->base.vtable = &zero_copy_grpc_protector_vtable;
+ return &impl->base;
+}
diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h
index 934b3cbeb2..6159708a84 100644
--- a/src/core/tsi/fake_transport_security.h
+++ b/src/core/tsi/fake_transport_security.h
@@ -39,6 +39,11 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client);
tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size);
+/* Creates a zero-copy protector directly without going through the handshake
+ * phase. */
+tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector(
+ size_t *max_protected_frame_size);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c
index 5bcfdfa61f..773b35e717 100644
--- a/src/core/tsi/transport_security_grpc.c
+++ b/src/core/tsi/transport_security_grpc.c
@@ -37,28 +37,33 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
Calls specific implementation after state/input validation. */
tsi_result tsi_zero_copy_grpc_protector_protect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices,
grpc_slice_buffer *protected_slices) {
- if (self == NULL || self->vtable == NULL || unprotected_slices == NULL ||
- protected_slices == NULL) {
+ if (exec_ctx == NULL || self == NULL || self->vtable == NULL ||
+ unprotected_slices == NULL || protected_slices == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED;
- return self->vtable->protect(self, unprotected_slices, protected_slices);
+ return self->vtable->protect(exec_ctx, self, unprotected_slices,
+ protected_slices);
}
tsi_result tsi_zero_copy_grpc_protector_unprotect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices,
grpc_slice_buffer *unprotected_slices) {
- if (self == NULL || self->vtable == NULL || protected_slices == NULL ||
- unprotected_slices == NULL) {
+ if (exec_ctx == NULL || self == NULL || self->vtable == NULL ||
+ protected_slices == NULL || unprotected_slices == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED;
- return self->vtable->unprotect(self, protected_slices, unprotected_slices);
+ return self->vtable->unprotect(exec_ctx, self, protected_slices,
+ unprotected_slices);
}
-void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) {
+void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self) {
if (self == NULL) return;
- self->vtable->destroy(self);
+ self->vtable->destroy(exec_ctx, self);
}
diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h
index 5ab5297cc4..375a758888 100644
--- a/src/core/tsi/transport_security_grpc.h
+++ b/src/core/tsi/transport_security_grpc.h
@@ -42,8 +42,8 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
- This method returns TSI_OK in case of success or a specific error code in
case of failure. */
tsi_result tsi_zero_copy_grpc_protector_protect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices,
- grpc_slice_buffer *protected_slices);
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices);
/* Outputs unprotected bytes.
- protected_slices is the bytes of protected frames.
@@ -52,21 +52,24 @@ tsi_result tsi_zero_copy_grpc_protector_protect(
there is not enough data to output in which case unprotected_slices has 0
bytes. */
tsi_result tsi_zero_copy_grpc_protector_unprotect(
- tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices,
- grpc_slice_buffer *unprotected_slices);
+ grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self,
+ grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices);
/* Destroys the tsi_zero_copy_grpc_protector object. */
-void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self);
+void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self);
/* Base for tsi_zero_copy_grpc_protector implementations. */
typedef struct {
- tsi_result (*protect)(tsi_zero_copy_grpc_protector *self,
+ tsi_result (*protect)(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self,
grpc_slice_buffer *unprotected_slices,
grpc_slice_buffer *protected_slices);
- tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self,
+ tsi_result (*unprotect)(grpc_exec_ctx *exec_ctx,
+ tsi_zero_copy_grpc_protector *self,
grpc_slice_buffer *protected_slices,
grpc_slice_buffer *unprotected_slices);
- void (*destroy)(tsi_zero_copy_grpc_protector *self);
+ void (*destroy)(grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self);
} tsi_zero_copy_grpc_protector_vtable;
struct tsi_zero_copy_grpc_protector {
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 2071827b64..ec642b0520 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -298,8 +298,8 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c',
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c',
- 'src/core/ext/filters/load_reporting/load_reporting.c',
- 'src/core/ext/filters/load_reporting/load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c',
+ 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c',
'src/core/ext/census/base_resources.c',
'src/core/ext/census/context.c',
'src/core/ext/census/gen/census.pb.c',
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 0402ce34fb..57b543967e 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -88,6 +88,7 @@ grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import;
grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import;
grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
grpc_alarm_create_type grpc_alarm_create_import;
+grpc_alarm_set_type grpc_alarm_set_import;
grpc_alarm_cancel_type grpc_alarm_cancel_import;
grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
@@ -395,6 +396,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown");
grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy");
grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create");
+ grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set");
grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");
grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy");
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index e3704e592b..c5c848ae44 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -242,13 +242,16 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import
typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq);
extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
#define grpc_completion_queue_destroy grpc_completion_queue_destroy_import
-typedef grpc_alarm *(*grpc_alarm_create_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *tag);
+typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved);
extern grpc_alarm_create_type grpc_alarm_create_import;
#define grpc_alarm_create grpc_alarm_create_import
-typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm);
+typedef void(*grpc_alarm_set_type)(grpc_alarm *alarm, grpc_completion_queue *cq, gpr_timespec deadline, void *tag, void *reserved);
+extern grpc_alarm_set_type grpc_alarm_set_import;
+#define grpc_alarm_set grpc_alarm_set_import
+typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm, void *reserved);
extern grpc_alarm_cancel_type grpc_alarm_cancel_import;
#define grpc_alarm_cancel grpc_alarm_cancel_import
-typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm);
+typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm, void *reserved);
extern grpc_alarm_destroy_type grpc_alarm_destroy_import;
#define grpc_alarm_destroy grpc_alarm_destroy_import
typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect);
diff --git a/test/core/channel/channel_stack_builder_test.c b/test/core/channel/channel_stack_builder_test.c
index be6afb7c07..682efd1438 100644
--- a/test/core/channel/channel_stack_builder_test.c
+++ b/test/core/channel/channel_stack_builder_test.c
@@ -59,10 +59,6 @@ static void channel_func(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
-static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return gpr_strdup("peer");
-}
-
bool g_replacement_fn_called = false;
bool g_original_fn_called = false;
void set_arg_once_fn(grpc_channel_stack *channel_stack,
@@ -94,7 +90,6 @@ const grpc_channel_filter replacement_filter = {
0,
channel_init_func,
channel_destroy_func,
- get_peer,
grpc_channel_next_get_info,
"filter_name"};
@@ -108,7 +103,6 @@ const grpc_channel_filter original_filter = {
0,
channel_init_func,
channel_destroy_func,
- get_peer,
grpc_channel_next_get_info,
"filter_name"};
diff --git a/test/core/end2end/fixtures/h2_load_reporting.c b/test/core/end2end/fixtures/h2_load_reporting.c
index 385e2cbf70..8a05bb722a 100644
--- a/test/core/end2end/fixtures/h2_load_reporting.c
+++ b/test/core/end2end/fixtures/h2_load_reporting.c
@@ -28,7 +28,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c
index 3584d47887..7b503790db 100644
--- a/test/core/end2end/tests/load_reporting_hook.c
+++ b/test/core/end2end/tests/load_reporting_hook.c
@@ -26,8 +26,8 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/filters/load_reporting/load_reporting.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/static_metadata.h"
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index 7ecd947e1f..e9f2c76738 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -36,12 +36,19 @@ static gpr_mu *g_mu;
static grpc_pollset *g_pollset;
static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
- size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices) {
+ size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices,
+ bool use_zero_copy_protector) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
tsi_frame_protector *fake_read_protector =
tsi_create_fake_frame_protector(NULL);
tsi_frame_protector *fake_write_protector =
tsi_create_fake_frame_protector(NULL);
+ tsi_zero_copy_grpc_protector *fake_read_zero_copy_protector =
+ use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL)
+ : NULL;
+ tsi_zero_copy_grpc_protector *fake_write_zero_copy_protector =
+ use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL)
+ : NULL;
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
@@ -54,8 +61,9 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset);
if (leftover_nslices == 0) {
- f.client_ep =
- grpc_secure_endpoint_create(fake_read_protector, tcp.client, NULL, 0);
+ f.client_ep = grpc_secure_endpoint_create(fake_read_protector,
+ fake_read_zero_copy_protector,
+ tcp.client, NULL, 0);
} else {
unsigned i;
tsi_result result;
@@ -96,31 +104,47 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
} while (still_pending_size > 0);
encrypted_leftover = grpc_slice_from_copied_buffer(
(const char *)encrypted_buffer, total_buffer_size - buffer_size);
- f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp.client,
- &encrypted_leftover, 1);
+ f.client_ep = grpc_secure_endpoint_create(
+ fake_read_protector, fake_read_zero_copy_protector, tcp.client,
+ &encrypted_leftover, 1);
grpc_slice_unref(encrypted_leftover);
gpr_free(encrypted_buffer);
}
- f.server_ep =
- grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0);
+ f.server_ep = grpc_secure_endpoint_create(fake_write_protector,
+ fake_write_zero_copy_protector,
+ tcp.server, NULL, 0);
grpc_exec_ctx_finish(&exec_ctx);
return f;
}
static grpc_endpoint_test_fixture
secure_endpoint_create_fixture_tcp_socketpair_noleftover(size_t slice_size) {
- return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0);
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0,
+ false);
+}
+
+static grpc_endpoint_test_fixture
+secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy(
+ size_t slice_size) {
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0,
+ true);
}
static grpc_endpoint_test_fixture
secure_endpoint_create_fixture_tcp_socketpair_leftover(size_t slice_size) {
grpc_slice s =
grpc_slice_from_copied_string("hello world 12345678900987654321");
- grpc_endpoint_test_fixture f;
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1,
+ false);
+}
- f = secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1);
- return f;
+static grpc_endpoint_test_fixture
+secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy(
+ size_t slice_size) {
+ grpc_slice s =
+ grpc_slice_from_copied_string("hello world 12345678900987654321");
+ return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1, true);
}
static void clean_up(void) {}
@@ -128,8 +152,14 @@ static void clean_up(void) {}
static grpc_endpoint_test_config configs[] = {
{"secure_ep/tcp_socketpair",
secure_endpoint_create_fixture_tcp_socketpair_noleftover, clean_up},
+ {"secure_ep/tcp_socketpair_zero_copy",
+ secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy,
+ clean_up},
{"secure_ep/tcp_socketpair_leftover",
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
+ {"secure_ep/tcp_socketpair_leftover_zero_copy",
+ secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy,
+ clean_up},
};
static void inc_call_ctr(grpc_exec_ctx *exec_ctx, void *arg,
@@ -184,7 +214,9 @@ int main(int argc, char **argv) {
g_pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
- test_leftover(configs[1], 1);
+ grpc_endpoint_tests(configs[1], g_pollset, g_mu);
+ test_leftover(configs[2], 1);
+ test_leftover(configs[3], 1);
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
diff --git a/test/core/surface/alarm_test.c b/test/core/surface/alarm_test.c
index 6971d92074..4fd7cb93c6 100644
--- a/test/core/surface/alarm_test.c
+++ b/test/core/surface/alarm_test.c
@@ -48,45 +48,50 @@ static void test_alarm(void) {
/* regular expiry */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(1), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(1), tag, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(2),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
}
{
/* cancellation */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL);
- grpc_alarm_cancel(alarm);
+ grpc_alarm_cancel(alarm, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success == 0);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
}
{
/* alarm_destroy before cq_next */
grpc_event ev;
void *tag = create_test_tag();
- grpc_alarm *alarm =
- grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag);
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL);
- grpc_alarm_destroy(alarm);
+ grpc_alarm_destroy(alarm, NULL);
ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success == 0);
}
+ {
+ /* alarm_destroy before set */
+ grpc_alarm *alarm = grpc_alarm_create(NULL);
+ grpc_alarm_destroy(alarm, NULL);
+ }
shutdown_and_destroy(cc);
}
diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc
index ce4168843c..212972d25d 100644
--- a/test/cpp/common/alarm_cpp_test.cc
+++ b/test/cpp/common/alarm_cpp_test.cc
@@ -18,6 +18,8 @@
#include <grpc++/alarm.h>
#include <grpc++/completion_queue.h>
+#include <thread>
+
#include <gtest/gtest.h>
#include "test/core/util/test_config.h"
@@ -28,6 +30,46 @@ namespace {
TEST(AlarmTest, RegularExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+
+ void* output_tag;
+ bool ok;
+ const CompletionQueue::NextStatus status = cq.AsyncNext(
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2));
+
+ EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(junk, output_tag);
+}
+
+TEST(AlarmTest, MultithreadedRegularExpiry) {
+ CompletionQueue cq;
+ void* junk = reinterpret_cast<void*>(1618033);
+ void* output_tag;
+ bool ok;
+ CompletionQueue::NextStatus status;
+ Alarm alarm;
+
+ std::thread t1([&alarm, &cq, &junk] {
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ });
+
+ std::thread t2([&cq, &ok, &output_tag, &status] {
+ status = cq.AsyncNext((void**)&output_tag, &ok,
+ grpc_timeout_seconds_to_deadline(2));
+ });
+
+ t1.join();
+ t2.join();
+ EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(junk, output_tag);
+}
+
+TEST(AlarmTest, DeprecatedRegularExpiry) {
+ CompletionQueue cq;
+ void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk);
void* output_tag;
@@ -43,7 +85,8 @@ TEST(AlarmTest, RegularExpiry) {
TEST(AlarmTest, MoveConstructor) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ Alarm first;
+ first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
Alarm second(std::move(first));
void* output_tag;
bool ok;
@@ -57,7 +100,8 @@ TEST(AlarmTest, MoveConstructor) {
TEST(AlarmTest, MoveAssignment) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk);
+ Alarm first;
+ first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk);
Alarm second(std::move(first));
first = std::move(second);
@@ -76,7 +120,8 @@ TEST(AlarmTest, RegularExpiryChrono) {
void* junk = reinterpret_cast<void*>(1618033);
std::chrono::system_clock::time_point one_sec_deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
- Alarm alarm(&cq, one_sec_deadline, junk);
+ Alarm alarm;
+ alarm.Set(&cq, one_sec_deadline, junk);
void* output_tag;
bool ok;
@@ -91,7 +136,8 @@ TEST(AlarmTest, RegularExpiryChrono) {
TEST(AlarmTest, ZeroExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(0), junk);
void* output_tag;
bool ok;
@@ -106,7 +152,8 @@ TEST(AlarmTest, ZeroExpiry) {
TEST(AlarmTest, NegativeExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(-1), junk);
void* output_tag;
bool ok;
@@ -121,7 +168,8 @@ TEST(AlarmTest, NegativeExpiry) {
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk);
+ Alarm alarm;
+ alarm.Set(&cq, grpc_timeout_seconds_to_deadline(2), junk);
alarm.Cancel();
void* output_tag;
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index 83b91e2ce9..985a335f1b 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -35,14 +35,14 @@ grpc_cc_library(
"fullstack_fixtures.h",
"helpers.h",
],
+ external_deps = [
+ "benchmark",
+ ],
deps = [
"//:grpc++_unsecure",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util_unsecure",
],
- external_deps = [
- "benchmark",
- ],
)
grpc_cc_binary(
@@ -76,14 +76,20 @@ grpc_cc_binary(
grpc_cc_binary(
name = "bm_fullstack_streaming_ping_pong",
testonly = 1,
- srcs = ["bm_fullstack_streaming_ping_pong.cc"],
+ srcs = [
+ "bm_fullstack_streaming_ping_pong.cc",
+ "fullstack_streaming_ping_pong.h",
+ ],
deps = [":helpers"],
)
grpc_cc_binary(
name = "bm_fullstack_streaming_pump",
testonly = 1,
- srcs = ["bm_fullstack_streaming_pump.cc"],
+ srcs = [
+ "bm_fullstack_streaming_pump.cc",
+ "fullstack_streaming_pump.h",
+ ],
deps = [":helpers"],
)
@@ -92,15 +98,18 @@ grpc_cc_binary(
testonly = 1,
srcs = ["bm_fullstack_trickle.cc"],
deps = [
- ":helpers",
- "//test/cpp/util:test_config",
+ ":helpers",
+ "//test/cpp/util:test_config",
],
)
grpc_cc_binary(
name = "bm_fullstack_unary_ping_pong",
testonly = 1,
- srcs = ["bm_fullstack_unary_ping_pong.cc"],
+ srcs = [
+ "bm_fullstack_unary_ping_pong.cc",
+ "fullstack_unary_ping_pong.h",
+ ],
deps = [":helpers"],
)
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 518c65ac8d..cadc9b2a11 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -35,7 +35,7 @@ extern "C" {
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
-#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -620,7 +620,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata);
typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata);
-typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST>
+typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST>
LoadReportingFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata);
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index 0712a40018..655e032faf 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -18,13 +18,7 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
namespace grpc {
namespace testing {
@@ -33,365 +27,6 @@ namespace testing {
auto& force_library_initialization = Library::get();
/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
-// messages in each call) in a loop on a single channel
-//
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-// Second parameter (i.e state.range(1)): Number of ping pong messages.
-// Note: One ping-pong means two messages (one from client to server and
-// the other from server to client):
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPong(benchmark::State& state) {
- const int msg_size = state.range(0);
- const int max_ping_pongs = state.range(1);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- while (state.KeepRunning()) {
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- // Establish async stream between client side and server side
- void* t;
- bool ok;
- int need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- // Send 'max_ping_pongs' number of ping pong messages
- int ping_pong_cnt = 0;
- while (ping_pong_cnt < max_ping_pongs) {
- request_rw->Write(send_request, tag(0)); // Start client send
- response_rw.Read(&recv_request, tag(1)); // Start server recv
- request_rw->Read(&recv_response, tag(2)); // Start client recv
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 1) {
- response_rw.Write(send_response, tag(3));
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- ping_pong_cnt++;
- }
-
- request_rw->WritesDone(tag(0));
- response_rw.Finish(Status::OK, tag(1));
-
- Status recv_status;
- request_rw->Finish(&recv_status, tag(2));
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
-}
-
-// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPongMsgs(benchmark::State& state) {
- const int msg_size = state.range(0);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- // Establish async stream between client side and server side
- void* t;
- bool ok;
- int need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- request_rw->Write(send_request, tag(0)); // Start client send
- response_rw.Read(&recv_request, tag(1)); // Start server recv
- request_rw->Read(&recv_response, tag(2)); // Start client recv
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 1) {
- response_rw.Write(send_response, tag(3));
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- }
-
- request_rw->WritesDone(tag(0));
- response_rw.Finish(Status::OK, tag(1));
- Status recv_status;
- request_rw->Finish(&recv_status, tag(2));
-
- need_tags = (1 << 0) | (1 << 1) | (1 << 2);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * 2);
-}
-
-// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
-// messages in each call) in a loop on a single channel. Different from
-// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
-// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
-// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
-// message; 2. final streaming message with trailing metadata.
-//
-// First parmeter (i.e state.range(0)): Message size (in bytes) to use
-// Second parameter (i.e state.range(1)): Number of ping pong messages.
-// Note: One ping-pong means two messages (one from client to server and
-// the other from server to client):
-// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
-// API and WriteLast API for server.
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
- const int msg_size = state.range(0);
- const int max_ping_pongs = state.range(1);
- // This options is used to test out server API: WriteLast and WriteAndFinish
- // respectively, since we can not use both of them on server side at the same
- // time. Value 1 means we are testing out the WriteAndFinish API, and
- // otherwise we are testing out the WriteLast API.
- const int write_and_finish = state.range(2);
-
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- EchoRequest send_request;
- EchoRequest recv_request;
-
- if (msg_size > 0) {
- send_request.set_message(std::string(msg_size, 'a'));
- send_response.set_message(std::string(msg_size, 'b'));
- }
-
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
-
- while (state.KeepRunning()) {
- ServerContext svr_ctx;
- ServerContextMutator svr_ctx_mut(&svr_ctx);
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
-
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- cli_ctx.set_initial_metadata_corked(true);
- // tag:1 here will never comes up, since we are not performing any op due
- // to initial metadata coalescing.
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
-
- void* t;
- bool ok;
- int need_tags;
-
- // Send 'max_ping_pongs' number of ping pong messages
- int ping_pong_cnt = 0;
- while (ping_pong_cnt < max_ping_pongs) {
- if (ping_pong_cnt == max_ping_pongs - 1) {
- request_rw->WriteLast(send_request, WriteOptions(), tag(2));
- } else {
- request_rw->Write(send_request, tag(2)); // Start client send
- }
-
- need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
-
- if (ping_pong_cnt == 0) {
- // wait for the server call structure (call_hook, etc.) to be
- // initialized (async stream between client side and server side
- // established). It is necessary when client init metadata is
- // coalesced
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- while ((int)(intptr_t)t != 0) {
- // In some cases tag:2 comes before tag:0 (write tag comes out
- // first), this while loop is to make sure get tag:0.
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- }
- }
-
- response_rw.Read(&recv_request, tag(3)); // Start server recv
- request_rw->Read(&recv_response, tag(4)); // Start client recv
-
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
-
- // If server recv is complete, start the server send operation
- if (i == 3) {
- if (ping_pong_cnt == max_ping_pongs - 1) {
- if (write_and_finish == 1) {
- response_rw.WriteAndFinish(send_response, WriteOptions(),
- Status::OK, tag(5));
- } else {
- response_rw.WriteLast(send_response, WriteOptions(), tag(5));
- // WriteLast buffers the write, so neither server write op nor
- // client read op will finish inside the while loop.
- need_tags &= ~(1 << 4);
- need_tags &= ~(1 << 5);
- }
- } else {
- response_rw.Write(send_response, tag(5));
- }
- }
-
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- ping_pong_cnt++;
- }
-
- if (max_ping_pongs == 0) {
- need_tags = (1 << 6) | (1 << 7) | (1 << 8);
- } else {
- if (write_and_finish == 1) {
- need_tags = (1 << 8);
- } else {
- // server's buffered write and the client's read of the buffered write
- // tags should come up.
- need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
- }
- }
-
- // No message write or initial metadata write happened yet.
- if (max_ping_pongs == 0) {
- request_rw->WritesDone(tag(6));
- // wait for server call data structure(call_hook, etc.) to be
- // initialized, since initial metadata is corked.
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- while ((int)(intptr_t)t != 0) {
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- }
- response_rw.Finish(Status::OK, tag(7));
- } else {
- if (write_and_finish != 1) {
- response_rw.Finish(Status::OK, tag(7));
- }
- }
-
- Status recv_status;
- request_rw->Finish(&recv_status, tag(8));
-
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
-
- GPR_ASSERT(recv_status.ok());
- }
- }
-
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
-}
-
-/*******************************************************************************
* CONFIGURATIONS
*/
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
index 6fbf9da0ad..c7ceacd320 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
@@ -18,157 +18,18 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
namespace grpc {
namespace testing {
-// force library initialization
-auto& force_library_initialization = Library::get();
-
-/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-template <class Fixture>
-static void BM_PumpStreamClientToServer(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoRequest send_request;
- EchoRequest recv_request;
- if (state.range(0) > 0) {
- send_request.set_message(std::string(state.range(0), 'a'));
- }
- Status recv_status;
- ServerContext svr_ctx;
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- ClientContext cli_ctx;
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
- void* t;
- bool ok;
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- response_rw.Read(&recv_request, tag(0));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- request_rw->Write(send_request, tag(1));
- while (true) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- response_rw.Read(&recv_request, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
- }
- }
- }
- request_rw->WritesDone(tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- response_rw.Finish(Status::OK, tag(0));
- Status final_status;
- request_rw->Finish(&final_status, tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- GPR_ASSERT(final_status.ok());
- }
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
-}
-
-template <class Fixture>
-static void BM_PumpStreamServerToClient(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- {
- EchoResponse send_response;
- EchoResponse recv_response;
- if (state.range(0) > 0) {
- send_response.set_message(std::string(state.range(0), 'a'));
- }
- Status recv_status;
- ServerContext svr_ctx;
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- ClientContext cli_ctx;
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
- void* t;
- bool ok;
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- request_rw->Read(&recv_response, tag(0));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- response_rw.Write(send_response, tag(1));
- while (true) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- request_rw->Read(&recv_response, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
- }
- }
- }
- response_rw.Finish(Status::OK, tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- }
- fixture->Finish(state);
- fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
-}
-
/*******************************************************************************
* CONFIGURATIONS
*/
+// force library initialization
+auto& force_library_initialization = Library::get();
+
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
index 9af751245f..fa41d114c0 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
@@ -18,13 +18,7 @@
/* Benchmark gRPC end2end in various configurations */
-#include <benchmark/benchmark.h>
-#include <sstream>
-#include "src/core/lib/profiling/timers.h"
-#include "src/cpp/client/create_channel_internal.h"
-#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
-#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
namespace grpc {
namespace testing {
@@ -33,85 +27,6 @@ namespace testing {
auto& force_library_initialization = Library::get();
/*******************************************************************************
- * BENCHMARKING KERNELS
- */
-
-static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-
-template <class Fixture, class ClientContextMutator, class ServerContextMutator>
-static void BM_UnaryPingPong(benchmark::State& state) {
- EchoTestService::AsyncService service;
- std::unique_ptr<Fixture> fixture(new Fixture(&service));
- EchoRequest send_request;
- EchoResponse send_response;
- EchoResponse recv_response;
- if (state.range(0) > 0) {
- send_request.set_message(std::string(state.range(0), 'a'));
- }
- if (state.range(1) > 0) {
- send_response.set_message(std::string(state.range(1), 'a'));
- }
- Status recv_status;
- struct ServerEnv {
- ServerContext ctx;
- EchoRequest recv_request;
- grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
- ServerEnv() : response_writer(&ctx) {}
- };
- uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
- ServerEnv* server_env[2] = {
- reinterpret_cast<ServerEnv*>(server_env_buffer),
- reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
- new (server_env[0]) ServerEnv;
- new (server_env[1]) ServerEnv;
- service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
- &server_env[0]->response_writer, fixture->cq(),
- fixture->cq(), tag(0));
- service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
- &server_env[1]->response_writer, fixture->cq(),
- fixture->cq(), tag(1));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
- while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- recv_response.Clear();
- ClientContext cli_ctx;
- ClientContextMutator cli_ctx_mut(&cli_ctx);
- std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
- stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
- void* t;
- bool ok;
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- GPR_ASSERT(t == tag(0) || t == tag(1));
- intptr_t slot = reinterpret_cast<intptr_t>(t);
- ServerEnv* senv = server_env[slot];
- ServerContextMutator svr_ctx_mut(&senv->ctx);
- senv->response_writer.Finish(send_response, Status::OK, tag(3));
- response_reader->Finish(&recv_response, &recv_status, tag(4));
- for (int i = (1 << 3) | (1 << 4); i != 0;) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int tagnum = (int)reinterpret_cast<intptr_t>(t);
- GPR_ASSERT(i & (1 << tagnum));
- i -= 1 << tagnum;
- }
- GPR_ASSERT(recv_status.ok());
-
- senv->~ServerEnv();
- senv = new (senv) ServerEnv();
- service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
- fixture->cq(), fixture->cq(), tag(slot));
- }
- fixture->Finish(state);
- fixture.reset();
- server_env[0]->~ServerEnv();
- server_env[1]->~ServerEnv();
- state.SetBytesProcessed(state.range(0) * state.iterations() +
- state.range(1) * state.iterations());
-}
-
-/*******************************************************************************
* CONFIGURATIONS
*/
diff --git a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
new file mode 100644
index 0000000000..ff1f966753
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h
@@ -0,0 +1,396 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPong(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ // Establish async stream between client side and server side
+ void* t;
+ bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ request_rw->WritesDone(tag(0));
+ response_rw.Finish(Status::OK, tag(1));
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+
+// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongMsgs(benchmark::State& state) {
+ const int msg_size = state.range(0);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ // Establish async stream between client side and server side
+ void* t;
+ bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+
+ request_rw->WritesDone(tag(0));
+ response_rw.Finish(Status::OK, tag(1));
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * 2);
+}
+
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel. Different from
+// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
+// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
+// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
+// message; 2. final streaming message with trailing metadata.
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
+// API and WriteLast API for server.
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+ // This options is used to test out server API: WriteLast and WriteAndFinish
+ // respectively, since we can not use both of them on server side at the same
+ // time. Value 1 means we are testing out the WriteAndFinish API, and
+ // otherwise we are testing out the WriteLast API.
+ const int write_and_finish = state.range(2);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 here will never comes up, since we are not performing any op due
+ // to initial metadata coalescing.
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ void* t;
+ bool ok;
+ int need_tags;
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ request_rw->WriteLast(send_request, WriteOptions(), tag(2));
+ } else {
+ request_rw->Write(send_request, tag(2)); // Start client send
+ }
+
+ need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+ if (ping_pong_cnt == 0) {
+ // wait for the server call structure (call_hook, etc.) to be
+ // initialized (async stream between client side and server side
+ // established). It is necessary when client init metadata is
+ // coalesced
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ // In some cases tag:2 comes before tag:0 (write tag comes out
+ // first), this while loop is to make sure get tag:0.
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ }
+
+ response_rw.Read(&recv_request, tag(3)); // Start server recv
+ request_rw->Read(&recv_response, tag(4)); // Start client recv
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 3) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ if (write_and_finish == 1) {
+ response_rw.WriteAndFinish(send_response, WriteOptions(),
+ Status::OK, tag(5));
+ } else {
+ response_rw.WriteLast(send_response, WriteOptions(), tag(5));
+ // WriteLast buffers the write, so neither server write op nor
+ // client read op will finish inside the while loop.
+ need_tags &= ~(1 << 4);
+ need_tags &= ~(1 << 5);
+ }
+ } else {
+ response_rw.Write(send_response, tag(5));
+ }
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ if (max_ping_pongs == 0) {
+ need_tags = (1 << 6) | (1 << 7) | (1 << 8);
+ } else {
+ if (write_and_finish == 1) {
+ need_tags = (1 << 8);
+ } else {
+ // server's buffered write and the client's read of the buffered write
+ // tags should come up.
+ need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
+ }
+ }
+
+ // No message write or initial metadata write happened yet.
+ if (max_ping_pongs == 0) {
+ request_rw->WritesDone(tag(6));
+ // wait for server call data structure(call_hook, etc.) to be
+ // initialized, since initial metadata is corked.
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ response_rw.Finish(Status::OK, tag(7));
+ } else {
+ if (write_and_finish != 1) {
+ response_rw.Finish(Status::OK, tag(7));
+ }
+ }
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(8));
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
diff --git a/test/cpp/microbenchmarks/fullstack_streaming_pump.h b/test/cpp/microbenchmarks/fullstack_streaming_pump.h
new file mode 100644
index 0000000000..f9db212b02
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_streaming_pump.h
@@ -0,0 +1,170 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+template <class Fixture>
+static void BM_PumpStreamClientToServer(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ response_rw.Read(&recv_request, tag(0));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ request_rw->Write(send_request, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ response_rw.Read(&recv_request, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ request_rw->WritesDone(tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ response_rw.Finish(Status::OK, tag(0));
+ Status final_status;
+ request_rw->Finish(&final_status, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ GPR_ASSERT(final_status.ok());
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+
+template <class Fixture>
+static void BM_PumpStreamServerToClient(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_response.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ request_rw->Read(&recv_response, tag(0));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ response_rw.Write(send_response, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ request_rw->Read(&recv_response, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ response_rw.Finish(Status::OK, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
new file mode 100644
index 0000000000..76d278b2a0
--- /dev/null
+++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* Benchmark gRPC end2end in various configurations */
+
+#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
+#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
+
+#include <benchmark/benchmark.h>
+#include <sstream>
+#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
+#include "test/cpp/microbenchmarks/fullstack_fixtures.h"
+
+namespace grpc {
+namespace testing {
+
+/*******************************************************************************
+ * BENCHMARKING KERNELS
+ */
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_UnaryPingPong(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ EchoRequest send_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ if (state.range(1) > 0) {
+ send_response.set_message(std::string(state.range(1), 'a'));
+ }
+ Status recv_status;
+ struct ServerEnv {
+ ServerContext ctx;
+ EchoRequest recv_request;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
+ ServerEnv() : response_writer(&ctx) {}
+ };
+ uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
+ ServerEnv* server_env[2] = {
+ reinterpret_cast<ServerEnv*>(server_env_buffer),
+ reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
+ new (server_env[0]) ServerEnv;
+ new (server_env[1]) ServerEnv;
+ service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
+ &server_env[0]->response_writer, fixture->cq(),
+ fixture->cq(), tag(0));
+ service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
+ &server_env[1]->response_writer, fixture->cq(),
+ fixture->cq(), tag(1));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ recv_response.Clear();
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ void* t;
+ bool ok;
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ GPR_ASSERT(t == tag(0) || t == tag(1));
+ intptr_t slot = reinterpret_cast<intptr_t>(t);
+ ServerEnv* senv = server_env[slot];
+ ServerContextMutator svr_ctx_mut(&senv->ctx);
+ senv->response_writer.Finish(send_response, Status::OK, tag(3));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ for (int i = (1 << 3) | (1 << 4); i != 0;) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int tagnum = (int)reinterpret_cast<intptr_t>(t);
+ GPR_ASSERT(i & (1 << tagnum));
+ i -= 1 << tagnum;
+ }
+ GPR_ASSERT(recv_status.ok());
+
+ senv->~ServerEnv();
+ senv = new (senv) ServerEnv();
+ service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
+ fixture->cq(), fixture->cq(), tag(slot));
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ server_env[0]->~ServerEnv();
+ server_env[1]->~ServerEnv();
+ state.SetBytesProcessed(state.range(0) * state.iterations() +
+ state.range(1) * state.iterations());
+}
+} // namespace testing
+} // namespace grpc
+
+#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 97e2aef914..912c871482 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -141,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
if (!next_issue_) { // ready to issue
RunNextState(true, nullptr);
} else { // wait for the issue time
- alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
}
}
};
@@ -360,8 +361,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
break; // loop around, don't return
case State::WAIT:
next_state_ = State::READY_TO_WRITE;
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
return true;
case State::READY_TO_WRITE:
if (!ok) {
@@ -518,8 +519,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
}
break; // loop around, don't return
case State::WAIT:
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
next_state_ = State::READY_TO_WRITE;
return true;
case State::READY_TO_WRITE:
@@ -760,8 +761,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
break; // loop around, don't return
case State::WAIT:
next_state_ = State::READY_TO_WRITE;
- alarm_.reset(
- new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ alarm_.reset(new Alarm);
+ alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
return true;
case State::READY_TO_WRITE:
if (!ok) {
diff --git a/tools/codegen/core/gen_stats_data.py b/tools/codegen/core/gen_stats_data.py
index df6cd5a6bd..8e4ef594af 100755
--- a/tools/codegen/core/gen_stats_data.py
+++ b/tools/codegen/core/gen_stats_data.py
@@ -19,13 +19,30 @@ import ctypes
import math
import sys
import yaml
+import json
with open('src/core/lib/debug/stats_data.yaml') as f:
attrs = yaml.load(f.read())
+REQUIRED_FIELDS = ['name', 'doc']
+
+def make_type(name, fields):
+ return (collections.namedtuple(name, ' '.join(list(set(REQUIRED_FIELDS + fields)))), [])
+
+def c_str(s, encoding='ascii'):
+ if isinstance(s, unicode):
+ s = s.encode(encoding)
+ result = ''
+ for c in s:
+ if not (32 <= ord(c) < 127) or c in ('\\', '"'):
+ result += '\\%03o' % ord(c)
+ else:
+ result += c
+ return '"' + result + '"'
+
types = (
- (collections.namedtuple('Counter', 'name'), []),
- (collections.namedtuple('Histogram', 'name max buckets'), []),
+ make_type('Counter', []),
+ make_type('Histogram', ['max', 'buckets']),
)
inst_map = dict((t[0].__name__, t[1]) for t in types)
@@ -200,6 +217,8 @@ with open('src/core/lib/debug/stats_data.h', 'w') as H:
print >>H, "} grpc_stats_%ss;" % (typename.lower())
print >>H, "extern const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT];" % (
typename.lower(), typename.upper())
+ print >>H, "extern const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT];" % (
+ typename.lower(), typename.upper())
histo_start = []
histo_buckets = []
@@ -269,8 +288,14 @@ with open('src/core/lib/debug/stats_data.c', 'w') as C:
print >>C, "const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT] = {" % (
typename.lower(), typename.upper())
for inst in instances:
- print >>C, " \"%s\"," % inst.name
+ print >>C, " %s," % c_str(inst.name)
print >>C, "};"
+ print >>C, "const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT] = {" % (
+ typename.lower(), typename.upper())
+ for inst in instances:
+ print >>C, " %s," % c_str(inst.doc)
+ print >>C, "};"
+
for i, tbl in enumerate(static_tables):
print >>C, "const %s grpc_stats_table_%d[%d] = {%s};" % (
tbl[0], i, len(tbl[1]), ','.join('%s' % x for x in tbl[1]))
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index e4cc1c7461..632735342b 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 4.0.0-dev
+PROJECT_NUMBER = 5.0.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c2daf900fb..e352cb78f1 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 4.0.0-dev
+PROJECT_NUMBER = 5.0.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
@@ -979,10 +979,10 @@ src/core/ext/filters/http/message_compress/message_compress_filter.c \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
src/core/ext/filters/http/server/http_server_filter.c \
src/core/ext/filters/http/server/http_server_filter.h \
-src/core/ext/filters/load_reporting/load_reporting.c \
-src/core/ext/filters/load_reporting/load_reporting.h \
-src/core/ext/filters/load_reporting/load_reporting_filter.c \
-src/core/ext/filters/load_reporting/load_reporting_filter.h \
+src/core/ext/filters/load_reporting/server_load_reporting_filter.c \
+src/core/ext/filters/load_reporting/server_load_reporting_filter.h \
+src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \
+src/core/ext/filters/load_reporting/server_load_reporting_plugin.h \
src/core/ext/filters/max_age/max_age_filter.c \
src/core/ext/filters/max_age/max_age_filter.h \
src/core/ext/filters/message_size/message_size_filter.c \
diff --git a/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg
new file mode 100644
index 0000000000..24e7984f3a
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg
@@ -0,0 +1,30 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+timeout_mins: 20
+action {
+ define_artifacts {
+ regex: "**/*sponge_log.xml"
+ regex: "github/grpc/reports/**"
+ }
+}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f basictests linux sanity opt --inner_jobs 16 -j 1 --internal_ci"
+}
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index dd9a171268..2f6e34bfb3 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2790,12 +2790,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_streaming_ping_pong",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc",
+ "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h"
],
"third_party": false,
"type": "target"
@@ -2811,12 +2814,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_streaming_pump",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc",
+ "test/cpp/microbenchmarks/fullstack_streaming_pump.h"
],
"third_party": false,
"type": "target"
@@ -2854,12 +2860,15 @@
"grpc_test_util_unsecure",
"grpc_unsecure"
],
- "headers": [],
+ "headers": [
+ "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "bm_fullstack_unary_ping_pong",
"src": [
- "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc"
+ "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc",
+ "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
],
"third_party": false,
"type": "target"
@@ -5890,7 +5899,6 @@
"grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -5899,6 +5907,7 @@
"grpc_resolver_sockaddr",
"grpc_secure",
"grpc_server_backward_compatibility",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_client_secure",
"grpc_transport_chttp2_server_insecure",
@@ -5920,7 +5929,7 @@
"deps": [
"gpr",
"grpc_base",
- "grpc_load_reporting",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_secure",
"grpc_transport_cronet_client_secure"
],
@@ -5997,7 +6006,6 @@
"grpc_lb_policy_grpclb",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
- "grpc_load_reporting",
"grpc_max_age_filter",
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
@@ -6005,6 +6013,7 @@
"grpc_resolver_fake",
"grpc_resolver_sockaddr",
"grpc_server_backward_compatibility",
+ "grpc_server_load_reporting",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_server_insecure",
"grpc_transport_inproc",
@@ -8565,27 +8574,6 @@
"grpc_base"
],
"headers": [
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h"
- ],
- "is_filegroup": true,
- "language": "c",
- "name": "grpc_load_reporting",
- "src": [
- "src/core/ext/filters/load_reporting/load_reporting.c",
- "src/core/ext/filters/load_reporting/load_reporting.h",
- "src/core/ext/filters/load_reporting/load_reporting_filter.c",
- "src/core/ext/filters/load_reporting/load_reporting_filter.h"
- ],
- "third_party": false,
- "type": "filegroup"
- },
- {
- "deps": [
- "gpr",
- "grpc_base"
- ],
- "headers": [
"src/core/ext/filters/max_age/max_age_filter.h"
],
"is_filegroup": true,
@@ -8793,6 +8781,27 @@
{
"deps": [
"gpr",
+ "grpc_base"
+ ],
+ "headers": [
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
+ ],
+ "is_filegroup": true,
+ "language": "c",
+ "name": "grpc_server_load_reporting",
+ "src": [
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_filter.h",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c",
+ "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
+ ],
+ "third_party": false,
+ "type": "filegroup"
+ },
+ {
+ "deps": [
+ "gpr",
"gpr_test_util",
"grpc_base",
"grpc_client_channel",
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 4311e53839..b66c5f7f71 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -69,17 +69,22 @@ _POLLING_STRATEGIES = {
}
-def get_flaky_tests(limit=None):
+BigQueryTestData = collections.namedtuple('BigQueryTestData', 'name flaky cpu')
+
+
+def get_bqtest_data(limit=None):
import big_query_utils
bq = big_query_utils.create_big_query()
query = """
SELECT
filtered_test_name,
+ SUM(result != 'PASSED' AND result != 'SKIPPED') > 0 as flaky,
+ MAX(cpu_measured) as cpu
FROM (
SELECT
REGEXP_REPLACE(test_name, r'/\d+', '') AS filtered_test_name,
- result
+ result, cpu_measured
FROM
[grpc-testing:jenkins_test_results.aggregate_results]
WHERE
@@ -89,15 +94,15 @@ SELECT
GROUP BY
filtered_test_name
HAVING
- SUM(result != 'PASSED' AND result != 'SKIPPED') > 0"""
+ flaky OR cpu > 0"""
if limit:
query += " limit {}".format(limit)
query_job = big_query_utils.sync_query_job(bq, 'grpc-testing', query)
page = bq.jobs().getQueryResults(
pageToken=None,
**query_job['jobReference']).execute(num_retries=3)
- flake_names = [row['f'][0]['v'] for row in page['rows']]
- return flake_names
+ test_data = [BigQueryTestData(row['f'][0]['v'], row['f'][1]['v'] == 'true', float(row['f'][2]['v'])) for row in page['rows']]
+ return test_data
def platform_string():
@@ -141,6 +146,9 @@ class Config(object):
if not flaky and shortname and shortname in flaky_tests:
print('Setting %s to flaky' % shortname)
flaky = True
+ if shortname in shortname_to_cpu:
+ print('Update CPU cost for %s: %f -> %f' % (shortname, cpu_cost, shortname_to_cpu[shortname]))
+ cpu_cost = shortname_to_cpu[shortname]
return jobset.JobSpec(cmdline=self.tool_prefix + cmdline,
shortname=shortname,
environ=actual_environ,
@@ -1254,9 +1262,12 @@ argp.add_argument('--disable_auto_set_flakes', default=False, const=True, action
args = argp.parse_args()
flaky_tests = set()
+shortname_to_cpu = {}
if not args.disable_auto_set_flakes:
try:
- flaky_tests = set(get_flaky_tests())
+ for test in get_bqtest_data():
+ if test.flaky: flaky_tests.add(test.name)
+ if test.cpu > 0: shortname_to_cpu[test.name] = test.cpu
except:
print("Unexpected error getting flaky tests:", sys.exc_info()[0])
@@ -1516,7 +1527,7 @@ def _build_and_run(
# When running on travis, we want out test runs to be as similar as possible
# for reproducibility purposes.
if args.travis and args.max_time <= 0:
- massaged_one_run = sorted(one_run, key=lambda x: x.shortname)
+ massaged_one_run = sorted(one_run, key=lambda x: x.cpu_cost)
else:
# whereas otherwise, we want to shuffle things up to give all tests a
# chance to run.