diff options
67 files changed, 1702 insertions, 1211 deletions
@@ -843,7 +843,7 @@ grpc_cc_library( "grpc_deadline_filter", "grpc_lb_policy_pick_first", "grpc_lb_policy_round_robin", - "grpc_load_reporting", + "grpc_server_load_reporting", "grpc_max_age_filter", "grpc_message_size_filter", "grpc_resolver_dns_ares", @@ -1087,14 +1087,14 @@ grpc_cc_library( ) grpc_cc_library( - name = "grpc_load_reporting", + name = "grpc_server_load_reporting", srcs = [ - "src/core/ext/filters/load_reporting/load_reporting.c", - "src/core/ext/filters/load_reporting/load_reporting_filter.c", + "src/core/ext/filters/load_reporting/server_load_reporting_filter.c", + "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c", ], hdrs = [ - "src/core/ext/filters/load_reporting/load_reporting.h", - "src/core/ext/filters/load_reporting/load_reporting_filter.h", + "src/core/ext/filters/load_reporting/server_load_reporting_filter.h", + "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h", ], language = "c", deps = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 428bdc7773..ba2ec3f38f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1196,8 +1196,8 @@ add_library(grpc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c - src/core/ext/filters/load_reporting/load_reporting.c - src/core/ext/filters/load_reporting/load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c src/core/ext/census/base_resources.c src/core/ext/census/context.c src/core/ext/census/gen/census.pb.c @@ -1523,8 +1523,8 @@ add_library(grpc_cronet src/core/tsi/transport_security.c src/core/tsi/transport_security_adapter.c src/core/ext/transport/chttp2/client/chttp2_connector.c - src/core/ext/filters/load_reporting/load_reporting.c - src/core/ext/filters/load_reporting/load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c src/core/plugin_registry/grpc_cronet_plugin_registry.c ) @@ -2331,8 +2331,8 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c - src/core/ext/filters/load_reporting/load_reporting.c - src/core/ext/filters/load_reporting/load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_filter.c + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c @@ -410,7 +410,7 @@ E = @echo Q = @ endif -CORE_VERSION = 4.0.0-dev +CORE_VERSION = 5.0.0-dev CPP_VERSION = 1.7.0-dev CSHARP_VERSION = 1.7.0-dev @@ -460,7 +460,7 @@ SHARED_EXT_CORE = dll SHARED_EXT_CPP = dll SHARED_EXT_CSHARP = dll SHARED_PREFIX = -SHARED_VERSION_CORE = -4 +SHARED_VERSION_CORE = -5 SHARED_VERSION_CPP = -1 SHARED_VERSION_CSHARP = -1 else ifeq ($(SYSTEM),Darwin) @@ -2628,7 +2628,7 @@ install-shared_c: shared_c strip-shared_c install-pkg-config_c ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgpr.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.4 + $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so.5 $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgpr.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)" @@ -2637,7 +2637,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)" @@ -2646,7 +2646,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_cronet.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_cronet.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)" @@ -2655,7 +2655,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE)-dll.a $(prefix)/lib/libgrpc_unsecure.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(prefix)/lib/libgrpc_unsecure.so endif ifneq ($(SYSTEM),MINGW32) @@ -2672,7 +2672,7 @@ install-shared_cxx: shared_cxx strip-shared_cxx install-shared_c install-pkg-con ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)" @@ -2681,7 +2681,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_cronet$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_cronet.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_cronet.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)" @@ -2690,7 +2690,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_error_details$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_error_details.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_error_details.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)" @@ -2699,7 +2699,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_reflection$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_reflection.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_reflection.so endif $(E) "[INSTALL] Installing $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)" @@ -2708,7 +2708,7 @@ endif ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpc++_unsecure.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so endif ifneq ($(SYSTEM),MINGW32) @@ -2725,7 +2725,7 @@ install-shared_csharp: shared_csharp strip-shared_csharp ifeq ($(SYSTEM),MINGW32) $(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext$(SHARED_VERSION_CSHARP)-dll.a $(prefix)/lib/libgrpc_csharp_ext.a else ifneq ($(SYSTEM),Darwin) - $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.4 + $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP) $(prefix)/lib/libgrpc_csharp_ext.so endif ifneq ($(SYSTEM),MINGW32) @@ -2892,8 +2892,8 @@ $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGPR_OB ifeq ($(SYSTEM),Darwin) $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) else - $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.4 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) - $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.4 + $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.5 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) + $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.5 $(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so endif endif @@ -3166,8 +3166,8 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ - src/core/ext/filters/load_reporting/load_reporting.c \ - src/core/ext/filters/load_reporting/load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \ src/core/ext/census/base_resources.c \ src/core/ext/census/context.c \ src/core/ext/census/gen/census.pb.c \ @@ -3261,8 +3261,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGRPC_ ifeq ($(SYSTEM),Darwin) $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) else - $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) - $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.4 + $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) + $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so endif endif @@ -3491,8 +3491,8 @@ LIBGRPC_CRONET_SRC = \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ src/core/ext/transport/chttp2/client/chttp2_connector.c \ - src/core/ext/filters/load_reporting/load_reporting.c \ - src/core/ext/filters/load_reporting/load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \ src/core/plugin_registry/grpc_cronet_plugin_registry.c \ PUBLIC_HEADERS_C += \ @@ -3557,8 +3557,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(L ifeq ($(SYSTEM),Darwin) $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) else - $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) - $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.4 + $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_cronet.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_CRONET_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) + $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_cronet$(SHARED_VERSION_CORE).so endif endif @@ -4261,8 +4261,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ - src/core/ext/filters/load_reporting/load_reporting.c \ - src/core/ext/filters/load_reporting/load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c \ @@ -4355,8 +4355,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $ ifeq ($(SYSTEM),Darwin) $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) else - $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.4 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) - $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.4 + $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.5 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(LDLIBS) + $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.5 $(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so endif endif @@ -27,14 +27,14 @@ Libraries in different languages may be in different states of development. We a | Language | Source | Status | |-------------------------|-------------------------------------|---------| -| Shared C [core library] | [src/core](src/core) | 1.0 | -| C++ | [src/cpp](src/cpp) | 1.0 | -| Ruby | [src/ruby](src/ruby) | 1.0 | -| NodeJS | [src/node](src/node) | 1.0 | -| Python | [src/python](src/python) | 1.0 | -| PHP | [src/php](src/php) | 1.0 | -| C# | [src/csharp](src/csharp) | 1.0 | -| Objective-C | [src/objective-c](src/objective-c) | 1.0 | +| Shared C [core library] | [src/core](src/core) | 1.6 | +| C++ | [src/cpp](src/cpp) | 1.6 | +| Ruby | [src/ruby](src/ruby) | 1.6 | +| NodeJS | [src/node](src/node) | 1.6 | +| Python | [src/python](src/python) | 1.6 | +| PHP | [src/php](src/php) | 1.6 | +| C# | [src/csharp](src/csharp) | 1.6 | +| Objective-C | [src/objective-c](src/objective-c) | 1.6 | Java source code is in the [grpc-java](http://github.com/grpc/grpc-java) repository. Go source code is in the diff --git a/binding.gyp b/binding.gyp index 0110951305..06dc731935 100644 --- a/binding.gyp +++ b/binding.gyp @@ -893,8 +893,8 @@ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', diff --git a/build.yaml b/build.yaml index fa013bac13..c694675e93 100644 --- a/build.yaml +++ b/build.yaml @@ -12,7 +12,7 @@ settings: '#08': Use "-preN" suffixes to identify pre-release versions '#09': Per-language overrides are possible with (eg) ruby_version tag here '#10': See the expand_version.py for all the quirks here - core_version: 4.0.0-dev + core_version: 5.0.0-dev g_stands_for: gambit version: 1.7.0-dev filegroups: @@ -590,16 +590,6 @@ filegroups: uses: - grpc_base - grpc_client_channel -- name: grpc_load_reporting - headers: - - src/core/ext/filters/load_reporting/load_reporting.h - - src/core/ext/filters/load_reporting/load_reporting_filter.h - src: - - src/core/ext/filters/load_reporting/load_reporting.c - - src/core/ext/filters/load_reporting/load_reporting_filter.c - plugin: grpc_load_reporting_plugin - uses: - - grpc_base - name: grpc_max_age_filter headers: - src/core/ext/filters/max_age/max_age_filter.h @@ -712,6 +702,16 @@ filegroups: - src/core/ext/filters/workarounds/workaround_utils.c uses: - grpc_base +- name: grpc_server_load_reporting + headers: + - src/core/ext/filters/load_reporting/server_load_reporting_filter.h + - src/core/ext/filters/load_reporting/server_load_reporting_plugin.h + src: + - src/core/ext/filters/load_reporting/server_load_reporting_filter.c + - src/core/ext/filters/load_reporting/server_load_reporting_plugin.c + plugin: grpc_server_load_reporting_plugin + uses: + - grpc_base - name: grpc_test_util_base build: test headers: @@ -1164,7 +1164,7 @@ libs: - grpc_resolver_dns_native - grpc_resolver_sockaddr - grpc_resolver_fake - - grpc_load_reporting + - grpc_server_load_reporting - grpc_secure - census - grpc_max_age_filter @@ -1190,7 +1190,7 @@ libs: - grpc_base - grpc_transport_cronet_client_secure - grpc_transport_chttp2_client_secure - - grpc_load_reporting + - grpc_server_load_reporting generate_plugin_registry: true platforms: - linux @@ -1264,7 +1264,7 @@ libs: - grpc_resolver_dns_native - grpc_resolver_sockaddr - grpc_resolver_fake - - grpc_load_reporting + - grpc_server_load_reporting - grpc_lb_policy_grpclb - grpc_lb_policy_pick_first - grpc_lb_policy_round_robin @@ -3621,6 +3621,8 @@ targets: - name: bm_fullstack_streaming_ping_pong build: test language: c++ + headers: + - test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h src: - test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc deps: @@ -3646,6 +3648,8 @@ targets: - name: bm_fullstack_streaming_pump build: test language: c++ + headers: + - test/cpp/microbenchmarks/fullstack_streaming_pump.h src: - test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc deps: @@ -3697,6 +3701,8 @@ targets: - name: bm_fullstack_unary_ping_pong build: test language: c++ + headers: + - test/cpp/microbenchmarks/fullstack_unary_ping_pong.h src: - test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc deps: diff --git a/build_config.rb b/build_config.rb index 7159c6e509..3dc31d4ce3 100644 --- a/build_config.rb +++ b/build_config.rb @@ -13,5 +13,5 @@ # limitations under the License. module GrpcBuildConfig - CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-4.dll' + CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-5.dll' end @@ -322,8 +322,8 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ - src/core/ext/filters/load_reporting/load_reporting.c \ - src/core/ext/filters/load_reporting/load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_filter.c \ + src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \ src/core/ext/census/base_resources.c \ src/core/ext/census/context.c \ src/core/ext/census/gen/census.pb.c \ diff --git a/config.w32 b/config.w32 index 1b1a82b1ae..92faad7a8f 100644 --- a/config.w32 +++ b/config.w32 @@ -299,8 +299,8 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper_fallback.c " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native\\dns_resolver.c " + "src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.c " + - "src\\core\\ext\\filters\\load_reporting\\load_reporting.c " + - "src\\core\\ext\\filters\\load_reporting\\load_reporting_filter.c " + + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.c " + + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.c " + "src\\core\\ext\\census\\base_resources.c " + "src\\core\\ext\\census\\context.c " + "src\\core\\ext\\census\\gen\\census.pb.c " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index fdab4b3f81..2f1f415866 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -443,8 +443,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', - 'src/core/ext/filters/load_reporting/load_reporting.h', - 'src/core/ext/filters/load_reporting/load_reporting_filter.h', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h', 'src/core/ext/census/aggregation.h', 'src/core/ext/census/base_resources.h', 'src/core/ext/census/census_interface.h', @@ -701,8 +701,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', @@ -938,8 +938,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', - 'src/core/ext/filters/load_reporting/load_reporting.h', - 'src/core/ext/filters/load_reporting/load_reporting_filter.h', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.h', 'src/core/ext/census/aggregation.h', 'src/core/ext/census/base_resources.h', 'src/core/ext/census/census_interface.h', @@ -65,6 +65,7 @@ EXPORTS grpc_completion_queue_shutdown grpc_completion_queue_destroy grpc_alarm_create + grpc_alarm_set grpc_alarm_cancel grpc_alarm_destroy grpc_channel_check_connectivity_state diff --git a/grpc.gemspec b/grpc.gemspec index c441107643..2d0f9fd450 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -379,8 +379,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h ) - s.files += %w( src/core/ext/filters/load_reporting/load_reporting.h ) - s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.h ) + s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.h ) + s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.h ) s.files += %w( src/core/ext/census/aggregation.h ) s.files += %w( src/core/ext/census/base_resources.h ) s.files += %w( src/core/ext/census/census_interface.h ) @@ -640,8 +640,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c ) s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c ) - s.files += %w( src/core/ext/filters/load_reporting/load_reporting.c ) - s.files += %w( src/core/ext/filters/load_reporting/load_reporting_filter.c ) + s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.c ) + s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.c ) s.files += %w( src/core/ext/census/base_resources.c ) s.files += %w( src/core/ext/census/context.c ) s.files += %w( src/core/ext/census/gen/census.pb.c ) @@ -459,8 +459,8 @@ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', @@ -1108,8 +1108,8 @@ 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c', diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index ed8dacbc94..2d88d868e5 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -37,20 +37,33 @@ class CompletionQueue; /// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h). class Alarm : private GrpcLibraryCodegen { public: - /// Create a completion queue alarm instance associated to \a cq. - /// - /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel), - /// an event with tag \a tag will be added to \a cq. If the alarm expired, the - /// event's success bit will be true, false otherwise (ie, upon cancellation). + /// Create an unset completion queue alarm + Alarm() : tag_(nullptr), alarm_(grpc_alarm_create(nullptr)) {} + + /// DEPRECATED: Create and set a completion queue alarm instance associated to + /// \a cq. + /// This form is deprecated because it is inherently racy. /// \internal We rely on the presence of \a cq for grpc initialization. If \a /// cq were ever to be removed, a reference to a static /// internal::GrpcLibraryInitializer instance would need to be introduced /// here. \endinternal. template <typename T> Alarm(CompletionQueue* cq, const T& deadline, void* tag) - : tag_(tag), - alarm_(grpc_alarm_create(cq->cq(), TimePoint<T>(deadline).raw_time(), - static_cast<void*>(&tag_))) {} + : tag_(tag), alarm_(grpc_alarm_create(nullptr)) { + grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(), + static_cast<void*>(&tag_), nullptr); + } + + /// Trigger an alarm instance on completion queue \a cq at the specified time. + /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel), + /// an event with tag \a tag will be added to \a cq. If the alarm expired, the + /// event's success bit will be true, false otherwise (ie, upon cancellation). + template <typename T> + void Set(CompletionQueue* cq, const T& deadline, void* tag) { + tag_.Set(tag); + grpc_alarm_set(alarm_, cq->cq(), TimePoint<T>(deadline).raw_time(), + static_cast<void*>(&tag_), nullptr); + } /// Alarms aren't copyable. Alarm(const Alarm&) = delete; @@ -69,17 +82,20 @@ class Alarm : private GrpcLibraryCodegen { /// Destroy the given completion queue alarm, cancelling it in the process. ~Alarm() { - if (alarm_ != nullptr) grpc_alarm_destroy(alarm_); + if (alarm_ != nullptr) grpc_alarm_destroy(alarm_, nullptr); } /// Cancel a completion queue alarm. Calling this function over an alarm that /// has already fired has no effect. - void Cancel() { grpc_alarm_cancel(alarm_); } + void Cancel() { + if (alarm_ != nullptr) grpc_alarm_cancel(alarm_, nullptr); + } private: class AlarmEntry : public CompletionQueueTag { public: AlarmEntry(void* tag) : tag_(tag) {} + void Set(void* tag) { tag_ = tag; } bool FinalizeResult(void** tag, bool* status) override { *tag = tag_; return true; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index eafd63619d..21ae70d13a 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -151,7 +151,8 @@ class ServerBuilder { /// Add a completion queue for handling asynchronous services. /// /// Caller is required to shutdown the server prior to shutting down the - /// returned completion queue. A typical usage scenario: + /// returned completion queue. Caller is also required to drain the + /// completion queue after shutting it down. A typical usage scenario: /// /// // While building the server: /// ServerBuilder builder; @@ -162,6 +163,10 @@ class ServerBuilder { /// // While shutting down the server; /// server_->Shutdown(); /// cq_->Shutdown(); // Always *after* the associated server's Shutdown()! + /// // Drain the cq_ that was created + /// void* ignored_tag; + /// bool ignored_ok; + /// while (cq_->Next(&ignored_tag, &ignored_ok)) { } /// /// \param is_frequently_polled This is an optional parameter to inform gRPC /// library about whether this completion queue would be frequently polled diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index b562167b5f..fab7d438aa 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -143,21 +143,24 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq); drained and no threads are executing grpc_completion_queue_next */ GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq); -/** Create a completion queue alarm instance associated to \a cq. +/** Create a completion queue alarm instance */ +GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved); + +/** Set a completion queue alarm instance associated to \a cq. * * Once the alarm expires (at \a deadline) or it's cancelled (see \a * grpc_alarm_cancel), an event with tag \a tag will be added to \a cq. If the * alarm expired, the event's success bit will be true, false otherwise (ie, * upon cancellation). */ -GRPCAPI grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, - gpr_timespec deadline, void *tag); +GRPCAPI void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, + gpr_timespec deadline, void *tag, void *reserved); /** Cancel a completion queue alarm. Calling this function over an alarm that * has already fired has no effect. */ -GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm); +GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved); /** Destroy the given completion queue alarm, cancelling it in the process. */ -GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm); +GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved); /** Check the connectivity state of a channel. */ GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state( diff --git a/package.xml b/package.xml index 4b1f42dd02..b7c0e679e5 100644 --- a/package.xml +++ b/package.xml @@ -389,8 +389,8 @@ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" /> - <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.h" role="src" /> - <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/aggregation.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/base_resources.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/census_interface.h" role="src" /> @@ -650,8 +650,8 @@ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c" role="src" /> - <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.c" role="src" /> - <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting_filter.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/base_resources.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/context.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/census/gen/census.pb.c" role="src" /> diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c index 17e946937f..7b8cf986f7 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.c +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.c @@ -24,8 +24,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> -#include "src/core/ext/filters/load_reporting/load_reporting.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -213,7 +213,7 @@ static void lr_start_transport_stream_op_batch( GPR_TIMER_END("lr_start_transport_stream_op_batch", 0); } -const grpc_channel_filter grpc_load_reporting_filter = { +const grpc_channel_filter grpc_server_load_reporting_filter = { lr_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.h b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h index 1a5424e43a..9527868c9f 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.h @@ -16,12 +16,13 @@ * */ -#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H -#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H +#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H -#include "src/core/ext/filters/load_reporting/load_reporting.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_stack.h" -extern const grpc_channel_filter grpc_load_reporting_filter; +extern const grpc_channel_filter grpc_server_load_reporting_filter; -#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_FILTER_H */ +#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c index b42aa99cdb..199cb883b3 100644 --- a/src/core/ext/filters/load_reporting/load_reporting.c +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c @@ -25,8 +25,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/sync.h> -#include "src/core/ext/filters/load_reporting/load_reporting.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" @@ -37,9 +37,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) { grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false); } -static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, - void *arg) { +static bool maybe_add_server_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); const grpc_channel_filter *filter = arg; @@ -61,10 +60,10 @@ grpc_arg grpc_load_reporting_enable_arg() { /* Plugin registration */ -void grpc_load_reporting_plugin_init(void) { +void grpc_server_load_reporting_plugin_init(void) { grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_load_reporting_filter, - (void *)&grpc_load_reporting_filter); + maybe_add_server_load_reporting_filter, + (void *)&grpc_server_load_reporting_filter); } -void grpc_load_reporting_plugin_shutdown() {} +void grpc_server_load_reporting_plugin_shutdown() {} diff --git a/src/core/ext/filters/load_reporting/load_reporting.h b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h index fc04d2826a..65a6d0900e 100644 --- a/src/core/ext/filters/load_reporting/load_reporting.h +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H -#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H +#ifndef GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H +#define GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H #include <grpc/impl/codegen/grpc_types.h> @@ -55,4 +55,5 @@ typedef struct grpc_load_reporting_call_data { /** Return a \a grpc_arg enabling load reporting */ grpc_arg grpc_load_reporting_enable_arg(); -#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_LOAD_REPORTING_H */ +#endif /* GRPC_CORE_EXT_FILTERS_LOAD_REPORTING_SERVER_LOAD_REPORTING_PLUGIN_H \ + */ diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index a21bb1ef40..5847c96a34 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -56,9 +56,58 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_queue_drained", "executor_push_retries", }; +const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { + "Number of client side calls created by this process", + "Number of server side calls created by this process", + "Number of polling syscalls (epoll_wait, poll, etc) made by this process", + "Number of sleeping syscalls made by this process", + "Number of times histogram increments went through the slow (binary " + "search) path", + "Number of write syscalls (or equivalent - eg sendmsg) made by this " + "process", + "Number of read syscalls (or equivalent - eg recvmsg) made by this process", + "Number of times a backup poller has been created (this can be expensive)", + "Number of polls performed on the backup poller", + "Number of batches received by HTTP2 transport", + "Number of cancelations received by HTTP2 transport", + "Number of batches containing send initial metadata", + "Number of batches containing send message", + "Number of batches containing send trailing metadata", + "Number of batches containing receive initial metadata", + "Number of batches containing receive message", + "Number of batches containing receive trailing metadata", + "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated", + "Number of HTTP2 writes offloaded to the executor from application threads", + "Number of HTTP2 writes that finished seeing more data needed to be " + "written", + "Number of HTTP2 writes that were made knowing there was still more data " + "to be written (we cap maximum write size to syscall_write)", + "Number of combiner lock entries by process (first items queued to a " + "combiner)", + "Number of items scheduled against combiner locks", + "Number of final items scheduled against combiner locks", + "Number of combiner locks offloaded to different threads", + "Number of finite runtime closures scheduled against the executor (gRPC " + "thread pool)", + "Number of potentially infinite runtime closures scheduled against the " + "executor (gRPC thread pool)", + "Number of closures scheduled by the executor to the executor", + "Number of thread wakeups initiated within the executor", + "Number of times an executor queue was drained", + "Number of times we raced and were forced to retry pushing a closure to " + "the executor", +}; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { - "tcp_write_size", "tcp_write_iov_size", "tcp_read_size", - "tcp_read_offer", "tcp_read_iov_size", "http2_send_message_size", + "tcp_write_size", "tcp_write_iov_size", "tcp_read_size", + "tcp_read_offer", "tcp_read_offer_iov_size", "http2_send_message_size", +}; +const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { + "Number of bytes offered to each syscall_write", + "Number of byte segments offered to each syscall_write", + "Number of bytes received by each syscall_read", + "Number of bytes offered to each syscall_read", + "Number of byte segments offered to each syscall_read", + "Size of messages received by HTTP2 transport", }; const int grpc_stats_table_0[65] = { 0, 1, 2, 3, 4, 6, 8, 11, @@ -189,11 +238,12 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_0, 64)); } -void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) { +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, + int value) { value = GPR_CLAMP(value, 0, 1024); if (value < 13) { - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, - value); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, value); return; } union { @@ -206,11 +256,12 @@ void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; _bkt.dbl = grpc_stats_table_2[bucket]; bucket -= (_val.uint < _bkt.uint); - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, - bucket); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, bucket); return; } - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_2, 64)); } @@ -247,6 +298,9 @@ const int *const grpc_stats_histo_bucket_boundaries[6] = { grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0}; void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, int x) = { - grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size, - grpc_stats_inc_tcp_read_size, grpc_stats_inc_tcp_read_offer, - grpc_stats_inc_tcp_read_iov_size, grpc_stats_inc_http2_send_message_size}; + grpc_stats_inc_tcp_write_size, + grpc_stats_inc_tcp_write_iov_size, + grpc_stats_inc_tcp_read_size, + grpc_stats_inc_tcp_read_offer, + grpc_stats_inc_tcp_read_offer_iov_size, + grpc_stats_inc_http2_send_message_size}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index bbc78fb341..2b8bcf8221 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -60,16 +60,18 @@ typedef enum { GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; +extern const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT]; typedef enum { GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT]; +extern const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT]; typedef enum { GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_FIRST_SLOT = 0, GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE_BUCKETS = 64, @@ -79,8 +81,8 @@ typedef enum { GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_FIRST_SLOT = 192, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_FIRST_SLOT = 256, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_FIRST_SLOT = 256, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 320, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_BUCKETS = 384 @@ -174,9 +176,9 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int x); #define GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, value) \ grpc_stats_inc_tcp_read_offer((exec_ctx), (int)(value)) void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int x); -#define GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, value) \ - grpc_stats_inc_tcp_read_iov_size((exec_ctx), (int)(value)) -void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_read_offer_iov_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x); #define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \ grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index e74fdc331d..ecceb7d493 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -17,58 +17,103 @@ # overall - counter: client_calls_created + doc: Number of client side calls created by this process - counter: server_calls_created + doc: Number of server side calls created by this process # polling - counter: syscall_poll + doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process - counter: syscall_wait + doc: Number of sleeping syscalls made by this process # stats system - counter: histogram_slow_lookups + doc: Number of times histogram increments went through the slow + (binary search) path # tcp - counter: syscall_write + doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process - counter: syscall_read + doc: Number of read syscalls (or equivalent - eg recvmsg) made by this process - histogram: tcp_write_size max: 16777216 # 16 meg max write tracked buckets: 64 + doc: Number of bytes offered to each syscall_write - histogram: tcp_write_iov_size max: 1024 buckets: 64 + doc: Number of byte segments offered to each syscall_write - histogram: tcp_read_size max: 16777216 buckets: 64 + doc: Number of bytes received by each syscall_read - histogram: tcp_read_offer max: 16777216 buckets: 64 -- histogram: tcp_read_iov_size + doc: Number of bytes offered to each syscall_read +- histogram: tcp_read_offer_iov_size max: 1024 buckets: 64 + doc: Number of byte segments offered to each syscall_read - counter: tcp_backup_pollers_created + doc: Number of times a backup poller has been created (this can be expensive) - counter: tcp_backup_poller_polls + doc: Number of polls performed on the backup poller # chttp2 - counter: http2_op_batches + doc: Number of batches received by HTTP2 transport - counter: http2_op_cancel + doc: Number of cancelations received by HTTP2 transport - counter: http2_op_send_initial_metadata + doc: Number of batches containing send initial metadata - counter: http2_op_send_message + doc: Number of batches containing send message - counter: http2_op_send_trailing_metadata + doc: Number of batches containing send trailing metadata - counter: http2_op_recv_initial_metadata + doc: Number of batches containing receive initial metadata - counter: http2_op_recv_message + doc: Number of batches containing receive message - counter: http2_op_recv_trailing_metadata + doc: Number of batches containing receive trailing metadata - histogram: http2_send_message_size max: 16777216 buckets: 64 + doc: Size of messages received by HTTP2 transport - counter: http2_pings_sent + doc: Number of HTTP2 pings sent by process - counter: http2_writes_begun + doc: Number of HTTP2 writes initiated - counter: http2_writes_offloaded + doc: Number of HTTP2 writes offloaded to the executor from application threads - counter: http2_writes_continued + doc: Number of HTTP2 writes that finished seeing more data needed to be + written - counter: http2_partial_writes + doc: Number of HTTP2 writes that were made knowing there was still more data + to be written (we cap maximum write size to syscall_write) # combiner locks - counter: combiner_locks_initiated + doc: Number of combiner lock entries by process + (first items queued to a combiner) - counter: combiner_locks_scheduled_items + doc: Number of items scheduled against combiner locks - counter: combiner_locks_scheduled_final_items + doc: Number of final items scheduled against combiner locks - counter: combiner_locks_offloaded + doc: Number of combiner locks offloaded to different threads # executor - counter: executor_scheduled_short_items + doc: Number of finite runtime closures scheduled against the executor + (gRPC thread pool) - counter: executor_scheduled_long_items + doc: Number of potentially infinite runtime closures scheduled against the + executor (gRPC thread pool) - counter: executor_scheduled_to_self + doc: Number of closures scheduled by the executor to the executor - counter: executor_wakeup_initiated + doc: Number of thread wakeups initiated within the executor - counter: executor_queue_drained + doc: Number of times an executor queue was drained - counter: executor_push_retries + doc: Number of times we raced and were forced to retry pushing a closure to + the executor diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index fba2bc017b..6c56c36bcd 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -399,7 +399,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_flags = 0; GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length); - GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); + GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); GPR_TIMER_BEGIN("recvmsg", 0); do { diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index b92b8fb8b8..ac392f87fe 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -44,6 +44,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now); +/* Initialize *timer without setting it. This can later be passed through + the regular init or cancel */ +void grpc_timer_init_unset(grpc_timer *timer); + /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will be called exactly once, either at expiration or cancellation. Thus, all diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 12efce241f..c08bb525b7 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -234,6 +234,8 @@ static void note_deadline_change(timer_shard *shard) { } } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } + void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 70f49bcbe8..adced41f53 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -77,6 +77,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_unref((uv_handle_t *)uv_timer); } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = 0; } + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 5e41b94ff8..ae5633b82c 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -34,7 +34,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -#include "src/core/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_grpc.h" #define STAGING_BUFFER_SIZE 8192 @@ -42,6 +42,7 @@ typedef struct { grpc_endpoint base; grpc_endpoint *wrapped_ep; struct tsi_frame_protector *protector; + struct tsi_zero_copy_grpc_protector *zero_copy_protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ grpc_closure *read_cb; @@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); + tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector); grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes); grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer); grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer); @@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, return; } - /* TODO(yangg) check error, maybe bail out early */ - for (i = 0; i < ep->source_buffer.count; i++) { - grpc_slice encrypted = ep->source_buffer.slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); - size_t message_size = GRPC_SLICE_LENGTH(encrypted); - - while (message_size > 0 || keep_looping) { - size_t unprotected_buffer_size_written = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_unprotect(ep->protector, message_bytes, - &processed_message_size, cur, - &unprotected_buffer_size_written); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Decryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += unprotected_buffer_size_written; - - if (cur == end) { - flush_read_staging_buffer(ep, &cur, &end); - /* Force to enter the loop again to extract buffered bytes in protector. - The bytes could be buffered because of running out of staging_buffer. - If this happens at the end of all slices, doing another unprotect - avoids leaving data in the protector. */ - keep_looping = 1; - } else if (unprotected_buffer_size_written > 0) { - keep_looping = 1; - } else { - keep_looping = 0; + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to unprotect. + result = tsi_zero_copy_grpc_protector_unprotect( + exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + } else { + // Use frame protector to unprotect. + /* TODO(yangg) check error, maybe bail out early */ + for (i = 0; i < ep->source_buffer.count; i++) { + grpc_slice encrypted = ep->source_buffer.slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); + size_t message_size = GRPC_SLICE_LENGTH(encrypted); + + while (message_size > 0 || keep_looping) { + size_t unprotected_buffer_size_written = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_unprotect( + ep->protector, message_bytes, &processed_message_size, cur, + &unprotected_buffer_size_written); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Decryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += unprotected_buffer_size_written; + + if (cur == end) { + flush_read_staging_buffer(ep, &cur, &end); + /* Force to enter the loop again to extract buffered bytes in + protector. The bytes could be buffered because of running out of + staging_buffer. If this happens at the end of all slices, doing + another unprotect avoids leaving data in the protector. */ + keep_looping = 1; + } else if (unprotected_buffer_size_written > 0) { + keep_looping = 1; + } else { + keep_looping = 0; + } } + if (result != TSI_OK) break; } - if (result != TSI_OK) break; - } - if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { - grpc_slice_buffer_add( - ep->read_buffer, - grpc_slice_split_head( - &ep->read_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { + grpc_slice_buffer_add( + ep->read_buffer, + grpc_slice_split_head( + &ep->read_staging_buffer, + (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + } } /* TODO(yangg) experiment with moving this block after read_cb to see if it @@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, } } - for (i = 0; i < slices->count; i++) { - grpc_slice plain = slices->slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); - size_t message_size = GRPC_SLICE_LENGTH(plain); - while (message_size > 0) { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect(ep->protector, message_bytes, - &processed_message_size, cur, - &protected_buffer_size_to_send); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Encryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += protected_buffer_size_to_send; - - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to protect. + result = tsi_zero_copy_grpc_protector_protect( + exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer); + } else { + // Use frame protector to protect. + for (i = 0; i < slices->count; i++) { + grpc_slice plain = slices->slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); + size_t message_size = GRPC_SLICE_LENGTH(plain); + while (message_size > 0) { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect(ep->protector, message_bytes, + &processed_message_size, cur, + &protected_buffer_size_to_send); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Encryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } } - } - if (result != TSI_OK) break; - } - if (result == TSI_OK) { - size_t still_pending_size; - do { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect_flush(ep->protector, cur, - &protected_buffer_size_to_send, - &still_pending_size); - gpr_mu_unlock(&ep->protector_mu); if (result != TSI_OK) break; - cur += protected_buffer_size_to_send; - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + } + if (result == TSI_OK) { + size_t still_pending_size; + do { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect_flush( + ep->protector, cur, &protected_buffer_size_to_send, + &still_pending_size); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) break; + cur += protected_buffer_size_to_send; + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } while (still_pending_size > 0); + if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { + grpc_slice_buffer_add( + &ep->output_buffer, + grpc_slice_split_head( + &ep->write_staging_buffer, + (size_t)(cur - + GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } - } while (still_pending_size > 0); - if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { - grpc_slice_buffer_add( - &ep->output_buffer, - grpc_slice_split_head( - &ep->write_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } } @@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_get_fd}; grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *transport, - grpc_slice *leftover_slices, size_t leftover_nslices) { + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *transport, grpc_slice *leftover_slices, + size_t leftover_nslices) { size_t i; secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint)); ep->base.vtable = &vtable; ep->wrapped_ep = transport; ep->protector = protector; + ep->zero_copy_protector = zero_copy_protector; grpc_slice_buffer_init(&ep->leftover_bytes); for (i = 0; i < leftover_nslices; i++) { grpc_slice_buffer_add(&ep->leftover_bytes, diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h index 1c5555f3df..3323a6ff42 100644 --- a/src/core/lib/security/transport/secure_endpoint.h +++ b/src/core/lib/security/transport/secure_endpoint.h @@ -23,12 +23,17 @@ #include "src/core/lib/iomgr/endpoint.h" struct tsi_frame_protector; +struct tsi_zero_copy_grpc_protector; extern grpc_tracer_flag grpc_trace_secure_endpoint; -/* Takes ownership of protector and to_wrap, and refs leftover_slices. */ +/* Takes ownership of protector, zero_copy_protector, and to_wrap, and refs + * leftover_slices. If zero_copy_protector is not NULL, protector will never be + * used. */ grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *to_wrap, - grpc_slice *leftover_slices, size_t leftover_nslices); + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *to_wrap, grpc_slice *leftover_slices, + size_t leftover_nslices); #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */ diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index bf7a5272cb..1a1313d472 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -32,6 +32,7 @@ #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/tsi_error.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/tsi/transport_security_grpc.h" #define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256 @@ -133,17 +134,31 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); return; } - // Create frame protector. - tsi_frame_protector *protector; - tsi_result result = tsi_handshaker_result_create_frame_protector( - h->handshaker_result, NULL, &protector); - if (result != TSI_OK) { + // Create zero-copy frame protector, if implemented. + tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; + tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( + h->handshaker_result, NULL, &zero_copy_protector); + if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Zero-copy frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); return; } + // Create frame protector if zero-copy frame protector is NULL. + tsi_frame_protector *protector = NULL; + if (zero_copy_protector == NULL) { + result = tsi_handshaker_result_create_frame_protector(h->handshaker_result, + NULL, &protector); + if (result != TSI_OK) { + error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Frame protector creation failed"), + result); + security_handshake_failed_locked(exec_ctx, h, error); + goto done; + } + } // Get unused bytes. const unsigned char *unused_bytes = NULL; size_t unused_bytes_size = 0; @@ -153,12 +168,12 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, if (unused_bytes_size > 0) { grpc_slice slice = grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, &slice, 1); grpc_slice_unref_internal(exec_ctx, slice); } else { - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, NULL, 0); } tsi_handshaker_result_destroy(h->handshaker_result); h->handshaker_result = NULL; diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index ec93303024..523e43445b 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -300,11 +300,12 @@ void *gpr_memrchr(const void *s, int c, size_t n) { } bool gpr_is_true(const char *s) { + size_t i; if (s == NULL) { return false; } static const char *truthy[] = {"yes", "true", "1"}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + for (i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == gpr_stricmp(s, truthy[i])) { return true; } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 7d60b1de17..5dbfaa2d43 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } static void alarm_unref(grpc_alarm *alarm) { if (gpr_unref(&alarm->refs)) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + if (alarm->cq != NULL) { + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + } grpc_exec_ctx_finish(&exec_ctx); gpr_free(alarm); } @@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { (void *)alarm, &alarm->completion); } -grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, - void *tag) { +grpc_alarm *grpc_alarm_create(void *reserved) { grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_ref_init(&alarm->refs, 1); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { @@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, } #endif + gpr_ref_init(&alarm->refs, 1); + grpc_timer_init_unset(&alarm->alarm); + alarm->cq = NULL; + GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, + grpc_schedule_on_exec_ctx); + return alarm; +} + +void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, + gpr_timespec deadline, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; GPR_ASSERT(grpc_cq_begin_op(cq, tag)); - GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, - grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_exec_ctx_finish(&exec_ctx); - return alarm; } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_timer_cancel(&exec_ctx, &alarm->alarm); grpc_exec_ctx_finish(&exec_ctx); } -void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_alarm_cancel(alarm); +void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) { + grpc_alarm_cancel(alarm, reserved); GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 96c16105e7..fd6ea4daa9 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -21,6 +21,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "4.0.0-dev"; } +const char *grpc_version_string(void) { return "5.0.0-dev"; } const char *grpc_g_stands_for(void) { return "gambit"; } diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c index 322ebea111..1c09f54ad9 100644 --- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c +++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c @@ -28,8 +28,8 @@ extern void grpc_client_channel_init(void); extern void grpc_client_channel_shutdown(void); extern void grpc_tsi_gts_init(void); extern void grpc_tsi_gts_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_http_filters_init, @@ -42,6 +42,6 @@ void grpc_register_built_in_plugins(void) { grpc_client_channel_shutdown); grpc_register_plugin(grpc_tsi_gts_init, grpc_tsi_gts_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); } diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index fa9974952c..9cacf3d306 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -44,8 +44,8 @@ extern void grpc_resolver_dns_native_init(void); extern void grpc_resolver_dns_native_shutdown(void); extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); extern void census_grpc_plugin_init(void); extern void census_grpc_plugin_shutdown(void); extern void grpc_max_age_filter_init(void); @@ -82,8 +82,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_dns_native_shutdown); grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_resolver_sockaddr_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_shutdown); grpc_register_plugin(grpc_max_age_filter_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index 7eb599d81a..7b90d796d5 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -36,8 +36,8 @@ extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); extern void grpc_resolver_fake_init(void); extern void grpc_resolver_fake_shutdown(void); -extern void grpc_load_reporting_plugin_init(void); -extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_server_load_reporting_plugin_init(void); +extern void grpc_server_load_reporting_plugin_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); @@ -72,8 +72,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_sockaddr_shutdown); grpc_register_plugin(grpc_resolver_fake_init, grpc_resolver_fake_shutdown); - grpc_register_plugin(grpc_load_reporting_plugin_init, - grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_server_load_reporting_plugin_init, + grpc_server_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 967126ecee..e7b3be3d86 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -25,7 +25,8 @@ #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/useful.h> -#include "src/core/tsi/transport_security.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/tsi/transport_security_grpc.h" /* --- Constants. ---*/ #define TSI_FAKE_FRAME_HEADER_SIZE 4 @@ -74,6 +75,14 @@ typedef struct { size_t max_frame_size; } tsi_fake_frame_protector; +typedef struct { + tsi_zero_copy_grpc_protector base; + grpc_slice_buffer header_sb; + grpc_slice_buffer protected_sb; + size_t max_frame_size; + size_t parsed_frame_size; +} tsi_fake_zero_copy_grpc_protector; + /* --- Utils. ---*/ static const char *tsi_fake_handshake_message_strings[] = { @@ -113,6 +122,28 @@ static void store32_little_endian(uint32_t value, unsigned char *buf) { buf[0] = (unsigned char)((value)&0xFF); } +static uint32_t read_frame_size(const grpc_slice_buffer *sb) { + GPR_ASSERT(sb != NULL && sb->length >= TSI_FAKE_FRAME_HEADER_SIZE); + uint8_t frame_size_buffer[TSI_FAKE_FRAME_HEADER_SIZE]; + uint8_t *buf = frame_size_buffer; + /* Copies the first 4 bytes to a temporary buffer. */ + size_t remaining = TSI_FAKE_FRAME_HEADER_SIZE; + for (size_t i = 0; i < sb->count; i++) { + size_t slice_length = GRPC_SLICE_LENGTH(sb->slices[i]); + if (remaining <= slice_length) { + memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), remaining); + remaining = 0; + break; + } else { + memcpy(buf, GRPC_SLICE_START_PTR(sb->slices[i]), slice_length); + buf += slice_length; + remaining -= slice_length; + } + } + GPR_ASSERT(remaining == 0); + return load32_little_endian(frame_size_buffer); +} + static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) { frame->offset = 0; frame->needs_draining = needs_draining; @@ -363,6 +394,84 @@ static const tsi_frame_protector_vtable frame_protector_vtable = { fake_protector_unprotect, fake_protector_destroy, }; +/* --- tsi_zero_copy_grpc_protector methods implementation. ---*/ + +static tsi_result fake_zero_copy_grpc_protector_protect( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, + grpc_slice_buffer *protected_slices) { + if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + /* Protects each frame. */ + while (unprotected_slices->length > 0) { + size_t frame_length = + GPR_MIN(impl->max_frame_size, + unprotected_slices->length + TSI_FAKE_FRAME_HEADER_SIZE); + grpc_slice slice = GRPC_SLICE_MALLOC(TSI_FAKE_FRAME_HEADER_SIZE); + store32_little_endian((uint32_t)frame_length, GRPC_SLICE_START_PTR(slice)); + grpc_slice_buffer_add(protected_slices, slice); + size_t data_length = frame_length - TSI_FAKE_FRAME_HEADER_SIZE; + grpc_slice_buffer_move_first(unprotected_slices, data_length, + protected_slices); + } + return TSI_OK; +} + +static tsi_result fake_zero_copy_grpc_protector_unprotect( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, + grpc_slice_buffer *unprotected_slices) { + if (self == NULL || unprotected_slices == NULL || protected_slices == NULL) { + return TSI_INVALID_ARGUMENT; + } + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + grpc_slice_buffer_move_into(protected_slices, &impl->protected_sb); + /* Unprotect each frame, if we get a full frame. */ + while (impl->protected_sb.length >= TSI_FAKE_FRAME_HEADER_SIZE) { + if (impl->parsed_frame_size == 0) { + impl->parsed_frame_size = read_frame_size(&impl->protected_sb); + if (impl->parsed_frame_size <= 4) { + gpr_log(GPR_ERROR, "Invalid frame size."); + return TSI_DATA_CORRUPTED; + } + } + /* If we do not have a full frame, return with OK status. */ + if (impl->protected_sb.length < impl->parsed_frame_size) break; + /* Strips header bytes. */ + grpc_slice_buffer_move_first(&impl->protected_sb, + TSI_FAKE_FRAME_HEADER_SIZE, &impl->header_sb); + /* Moves data to unprotected slices. */ + grpc_slice_buffer_move_first( + &impl->protected_sb, + impl->parsed_frame_size - TSI_FAKE_FRAME_HEADER_SIZE, + unprotected_slices); + impl->parsed_frame_size = 0; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &impl->header_sb); + } + return TSI_OK; +} + +static void fake_zero_copy_grpc_protector_destroy( + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self) { + if (self == NULL) return; + tsi_fake_zero_copy_grpc_protector *impl = + (tsi_fake_zero_copy_grpc_protector *)self; + grpc_slice_buffer_destroy_internal(exec_ctx, &impl->header_sb); + grpc_slice_buffer_destroy_internal(exec_ctx, &impl->protected_sb); + gpr_free(impl); +} + +static const tsi_zero_copy_grpc_protector_vtable + zero_copy_grpc_protector_vtable = { + fake_zero_copy_grpc_protector_protect, + fake_zero_copy_grpc_protector_unprotect, + fake_zero_copy_grpc_protector_destroy, +}; + /* --- tsi_handshaker_result methods implementation. ---*/ typedef struct { @@ -383,6 +492,14 @@ static tsi_result fake_handshaker_result_extract_peer( return result; } +static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_zero_copy_grpc_protector **protector) { + *protector = + tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size); + return TSI_OK; +} + static tsi_result fake_handshaker_result_create_frame_protector( const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, tsi_frame_protector **protector) { @@ -407,7 +524,7 @@ static void fake_handshaker_result_destroy(tsi_handshaker_result *self) { static const tsi_handshaker_result_vtable handshaker_result_vtable = { fake_handshaker_result_extract_peer, - NULL, /* create_zero_copy_grpc_protector */ + fake_handshaker_result_create_zero_copy_grpc_protector, fake_handshaker_result_create_frame_protector, fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy, @@ -631,3 +748,16 @@ tsi_frame_protector *tsi_create_fake_frame_protector( impl->base.vtable = &frame_protector_vtable; return &impl->base; } + +tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector( + size_t *max_protected_frame_size) { + tsi_fake_zero_copy_grpc_protector *impl = gpr_zalloc(sizeof(*impl)); + grpc_slice_buffer_init(&impl->header_sb); + grpc_slice_buffer_init(&impl->protected_sb); + impl->max_frame_size = (max_protected_frame_size == NULL) + ? TSI_FAKE_DEFAULT_FRAME_SIZE + : *max_protected_frame_size; + impl->parsed_frame_size = 0; + impl->base.vtable = &zero_copy_grpc_protector_vtable; + return &impl->base; +} diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h index 934b3cbeb2..6159708a84 100644 --- a/src/core/tsi/fake_transport_security.h +++ b/src/core/tsi/fake_transport_security.h @@ -39,6 +39,11 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client); tsi_frame_protector *tsi_create_fake_frame_protector( size_t *max_protected_frame_size); +/* Creates a zero-copy protector directly without going through the handshake + * phase. */ +tsi_zero_copy_grpc_protector *tsi_create_fake_zero_copy_grpc_protector( + size_t *max_protected_frame_size); + #ifdef __cplusplus } #endif diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c index 5bcfdfa61f..773b35e717 100644 --- a/src/core/tsi/transport_security_grpc.c +++ b/src/core/tsi/transport_security_grpc.c @@ -37,28 +37,33 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( Calls specific implementation after state/input validation. */ tsi_result tsi_zero_copy_grpc_protector_protect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices) { - if (self == NULL || self->vtable == NULL || unprotected_slices == NULL || - protected_slices == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + unprotected_slices == NULL || protected_slices == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->protect == NULL) return TSI_UNIMPLEMENTED; - return self->vtable->protect(self, unprotected_slices, protected_slices); + return self->vtable->protect(exec_ctx, self, unprotected_slices, + protected_slices); } tsi_result tsi_zero_copy_grpc_protector_unprotect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices) { - if (self == NULL || self->vtable == NULL || protected_slices == NULL || - unprotected_slices == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + protected_slices == NULL || unprotected_slices == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->unprotect == NULL) return TSI_UNIMPLEMENTED; - return self->vtable->unprotect(self, protected_slices, unprotected_slices); + return self->vtable->unprotect(exec_ctx, self, protected_slices, + unprotected_slices); } -void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self) { +void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self) { if (self == NULL) return; - self->vtable->destroy(self); + self->vtable->destroy(exec_ctx, self); } diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h index 5ab5297cc4..375a758888 100644 --- a/src/core/tsi/transport_security_grpc.h +++ b/src/core/tsi/transport_security_grpc.h @@ -42,8 +42,8 @@ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - This method returns TSI_OK in case of success or a specific error code in case of failure. */ tsi_result tsi_zero_copy_grpc_protector_protect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, - grpc_slice_buffer *protected_slices); + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices); /* Outputs unprotected bytes. - protected_slices is the bytes of protected frames. @@ -52,21 +52,24 @@ tsi_result tsi_zero_copy_grpc_protector_protect( there is not enough data to output in which case unprotected_slices has 0 bytes. */ tsi_result tsi_zero_copy_grpc_protector_unprotect( - tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, - grpc_slice_buffer *unprotected_slices); + grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self, + grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices); /* Destroys the tsi_zero_copy_grpc_protector object. */ -void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self); +void tsi_zero_copy_grpc_protector_destroy(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self); /* Base for tsi_zero_copy_grpc_protector implementations. */ typedef struct { - tsi_result (*protect)(tsi_zero_copy_grpc_protector *self, + tsi_result (*protect)(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices); - tsi_result (*unprotect)(tsi_zero_copy_grpc_protector *self, + tsi_result (*unprotect)(grpc_exec_ctx *exec_ctx, + tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices); - void (*destroy)(tsi_zero_copy_grpc_protector *self); + void (*destroy)(grpc_exec_ctx *exec_ctx, tsi_zero_copy_grpc_protector *self); } tsi_zero_copy_grpc_protector_vtable; struct tsi_zero_copy_grpc_protector { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2071827b64..ec642b0520 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -298,8 +298,8 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 0402ce34fb..57b543967e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -88,6 +88,7 @@ grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import; grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; grpc_alarm_create_type grpc_alarm_create_import; +grpc_alarm_set_type grpc_alarm_set_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; @@ -395,6 +396,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown"); grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy"); grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create"); + grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set"); grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy"); grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index e3704e592b..c5c848ae44 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -242,13 +242,16 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq); extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import -typedef grpc_alarm *(*grpc_alarm_create_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *tag); +typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved); extern grpc_alarm_create_type grpc_alarm_create_import; #define grpc_alarm_create grpc_alarm_create_import -typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm); +typedef void(*grpc_alarm_set_type)(grpc_alarm *alarm, grpc_completion_queue *cq, gpr_timespec deadline, void *tag, void *reserved); +extern grpc_alarm_set_type grpc_alarm_set_import; +#define grpc_alarm_set grpc_alarm_set_import +typedef void(*grpc_alarm_cancel_type)(grpc_alarm *alarm, void *reserved); extern grpc_alarm_cancel_type grpc_alarm_cancel_import; #define grpc_alarm_cancel grpc_alarm_cancel_import -typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm); +typedef void(*grpc_alarm_destroy_type)(grpc_alarm *alarm, void *reserved); extern grpc_alarm_destroy_type grpc_alarm_destroy_import; #define grpc_alarm_destroy grpc_alarm_destroy_import typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect); diff --git a/test/core/end2end/fixtures/h2_load_reporting.c b/test/core/end2end/fixtures/h2_load_reporting.c index 385e2cbf70..8a05bb722a 100644 --- a/test/core/end2end/fixtures/h2_load_reporting.c +++ b/test/core/end2end/fixtures/h2_load_reporting.c @@ -28,7 +28,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/http/server/http_server_filter.h" -#include "src/core/ext/filters/load_reporting/load_reporting.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c index 3584d47887..7b503790db 100644 --- a/test/core/end2end/tests/load_reporting_hook.c +++ b/test/core/end2end/tests/load_reporting_hook.c @@ -26,8 +26,8 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include "src/core/ext/filters/load_reporting/load_reporting.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/transport/static_metadata.h" diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 09846ba10e..839a05fa9b 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -36,12 +36,19 @@ static gpr_mu *g_mu; static grpc_pollset *g_pollset; static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( - size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices) { + size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices, + bool use_zero_copy_protector) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; tsi_frame_protector *fake_read_protector = tsi_create_fake_frame_protector(NULL); tsi_frame_protector *fake_write_protector = tsi_create_fake_frame_protector(NULL); + tsi_zero_copy_grpc_protector *fake_read_zero_copy_protector = + use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL) + : NULL; + tsi_zero_copy_grpc_protector *fake_write_zero_copy_protector = + use_zero_copy_protector ? tsi_create_fake_zero_copy_grpc_protector(NULL) + : NULL; grpc_endpoint_test_fixture f; grpc_endpoint_pair tcp; @@ -54,8 +61,9 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset); if (leftover_nslices == 0) { - f.client_ep = - grpc_secure_endpoint_create(fake_read_protector, tcp.client, NULL, 0); + f.client_ep = grpc_secure_endpoint_create(fake_read_protector, + fake_read_zero_copy_protector, + tcp.client, NULL, 0); } else { unsigned i; tsi_result result; @@ -96,31 +104,47 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( } while (still_pending_size > 0); encrypted_leftover = grpc_slice_from_copied_buffer( (const char *)encrypted_buffer, total_buffer_size - buffer_size); - f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp.client, - &encrypted_leftover, 1); + f.client_ep = grpc_secure_endpoint_create( + fake_read_protector, fake_read_zero_copy_protector, tcp.client, + &encrypted_leftover, 1); grpc_slice_unref(encrypted_leftover); gpr_free(encrypted_buffer); } - f.server_ep = - grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0); + f.server_ep = grpc_secure_endpoint_create(fake_write_protector, + fake_write_zero_copy_protector, + tcp.server, NULL, 0); grpc_exec_ctx_finish(&exec_ctx); return f; } static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair_noleftover(size_t slice_size) { - return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0); + return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0, + false); +} + +static grpc_endpoint_test_fixture +secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy( + size_t slice_size) { + return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0, + true); } static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair_leftover(size_t slice_size) { grpc_slice s = grpc_slice_from_copied_string("hello world 12345678900987654321"); - grpc_endpoint_test_fixture f; + return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1, + false); +} - f = secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1); - return f; +static grpc_endpoint_test_fixture +secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy( + size_t slice_size) { + grpc_slice s = + grpc_slice_from_copied_string("hello world 12345678900987654321"); + return secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1, true); } static void clean_up(void) {} @@ -128,8 +152,14 @@ static void clean_up(void) {} static grpc_endpoint_test_config configs[] = { {"secure_ep/tcp_socketpair", secure_endpoint_create_fixture_tcp_socketpair_noleftover, clean_up}, + {"secure_ep/tcp_socketpair_zero_copy", + secure_endpoint_create_fixture_tcp_socketpair_noleftover_zero_copy, + clean_up}, {"secure_ep/tcp_socketpair_leftover", secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, + {"secure_ep/tcp_socketpair_leftover_zero_copy", + secure_endpoint_create_fixture_tcp_socketpair_leftover_zero_copy, + clean_up}, }; static void inc_call_ctr(grpc_exec_ctx *exec_ctx, void *arg, @@ -184,7 +214,9 @@ int main(int argc, char **argv) { g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu); - test_leftover(configs[1], 1); + grpc_endpoint_tests(configs[1], g_pollset, g_mu); + test_leftover(configs[2], 1); + test_leftover(configs[3], 1); GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); diff --git a/test/core/surface/alarm_test.c b/test/core/surface/alarm_test.c index 6971d92074..4fd7cb93c6 100644 --- a/test/core/surface/alarm_test.c +++ b/test/core/surface/alarm_test.c @@ -48,45 +48,50 @@ static void test_alarm(void) { /* regular expiry */ grpc_event ev; void *tag = create_test_tag(); - grpc_alarm *alarm = - grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(1), tag); + grpc_alarm *alarm = grpc_alarm_create(NULL); + grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(1), tag, NULL); ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(2), NULL); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); GPR_ASSERT(ev.tag == tag); GPR_ASSERT(ev.success); - grpc_alarm_destroy(alarm); + grpc_alarm_destroy(alarm, NULL); } { /* cancellation */ grpc_event ev; void *tag = create_test_tag(); - grpc_alarm *alarm = - grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag); + grpc_alarm *alarm = grpc_alarm_create(NULL); + grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL); - grpc_alarm_cancel(alarm); + grpc_alarm_cancel(alarm, NULL); ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1), NULL); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); GPR_ASSERT(ev.tag == tag); GPR_ASSERT(ev.success == 0); - grpc_alarm_destroy(alarm); + grpc_alarm_destroy(alarm, NULL); } { /* alarm_destroy before cq_next */ grpc_event ev; void *tag = create_test_tag(); - grpc_alarm *alarm = - grpc_alarm_create(cc, grpc_timeout_seconds_to_deadline(2), tag); + grpc_alarm *alarm = grpc_alarm_create(NULL); + grpc_alarm_set(alarm, cc, grpc_timeout_seconds_to_deadline(2), tag, NULL); - grpc_alarm_destroy(alarm); + grpc_alarm_destroy(alarm, NULL); ev = grpc_completion_queue_next(cc, grpc_timeout_seconds_to_deadline(1), NULL); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); GPR_ASSERT(ev.tag == tag); GPR_ASSERT(ev.success == 0); } + { + /* alarm_destroy before set */ + grpc_alarm *alarm = grpc_alarm_create(NULL); + grpc_alarm_destroy(alarm, NULL); + } shutdown_and_destroy(cc); } diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc index ce4168843c..212972d25d 100644 --- a/test/cpp/common/alarm_cpp_test.cc +++ b/test/cpp/common/alarm_cpp_test.cc @@ -18,6 +18,8 @@ #include <grpc++/alarm.h> #include <grpc++/completion_queue.h> +#include <thread> + #include <gtest/gtest.h> #include "test/core/util/test_config.h" @@ -28,6 +30,46 @@ namespace { TEST(AlarmTest, RegularExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = cq.AsyncNext( + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, MultithreadedRegularExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); + void* output_tag; + bool ok; + CompletionQueue::NextStatus status; + Alarm alarm; + + std::thread t1([&alarm, &cq, &junk] { + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + }); + + std::thread t2([&cq, &ok, &output_tag, &status] { + status = cq.AsyncNext((void**)&output_tag, &ok, + grpc_timeout_seconds_to_deadline(2)); + }); + + t1.join(); + t2.join(); + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, DeprecatedRegularExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk); void* output_tag; @@ -43,7 +85,8 @@ TEST(AlarmTest, RegularExpiry) { TEST(AlarmTest, MoveConstructor) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk); + Alarm first; + first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); Alarm second(std::move(first)); void* output_tag; bool ok; @@ -57,7 +100,8 @@ TEST(AlarmTest, MoveConstructor) { TEST(AlarmTest, MoveAssignment) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk); + Alarm first; + first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); Alarm second(std::move(first)); first = std::move(second); @@ -76,7 +120,8 @@ TEST(AlarmTest, RegularExpiryChrono) { void* junk = reinterpret_cast<void*>(1618033); std::chrono::system_clock::time_point one_sec_deadline = std::chrono::system_clock::now() + std::chrono::seconds(1); - Alarm alarm(&cq, one_sec_deadline, junk); + Alarm alarm; + alarm.Set(&cq, one_sec_deadline, junk); void* output_tag; bool ok; @@ -91,7 +136,8 @@ TEST(AlarmTest, RegularExpiryChrono) { TEST(AlarmTest, ZeroExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(0), junk); void* output_tag; bool ok; @@ -106,7 +152,8 @@ TEST(AlarmTest, ZeroExpiry) { TEST(AlarmTest, NegativeExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(-1), junk); void* output_tag; bool ok; @@ -121,7 +168,8 @@ TEST(AlarmTest, NegativeExpiry) { TEST(AlarmTest, Cancellation) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(2), junk); alarm.Cancel(); void* output_tag; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 54408db600..c236f76e89 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -226,6 +226,31 @@ class ClientLbEnd2endTest : public ::testing::Test { ResetCounters(); } + bool SeenAllServers() { + for (const auto& server : servers_) { + if (server->service_.request_count() == 0) return false; + } + return true; + } + + // Updates \a connection_order by appending to it the index of the newly + // connected server. Must be called after every single RPC. + void UpdateConnectionOrder( + const std::vector<std::unique_ptr<ServerData>>& servers, + std::vector<int>* connection_order) { + for (size_t i = 0; i < servers.size(); ++i) { + if (servers[i]->service_.request_count() == 1) { + // Was the server index known? If not, update connection_order. + const auto it = + std::find(connection_order->begin(), connection_order->end(), i); + if (it == connection_order->end()) { + connection_order->push_back(i); + return; + } + } + } + } + const grpc::string server_host_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -370,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { ports.emplace_back(server->port_); } SetNextResolution(ports); - for (size_t i = 0; i < servers_.size(); ++i) { + // Wait until all backends are ready. + do { CheckRpcSendOk(); - } - // One request should have gone to each server. + } while (!SeenAllServers()); + ResetCounters(); + // "Sync" to the end of the list. Next sequence of picks will start at the + // first server (index 0). + WaitForServer(servers_.size() - 1); + std::vector<int> connection_order; for (size_t i = 0; i < servers_.size(); ++i) { - EXPECT_EQ(1, servers_[i]->service_.request_count()); + CheckRpcSendOk(); + UpdateConnectionOrder(servers_, &connection_order); } + // Backends should be iterated over in the order in which the addresses were + // given. + const auto expected = std::vector<int>{0, 1, 2}; + EXPECT_EQ(expected, connection_order); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } @@ -529,13 +564,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { StartServers(kNumServers, ports); ResetStub("round_robin"); SetNextResolution(ports); - // Send one RPC per backend and make sure they are used in order. - // Note: This relies on the fact that the subchannels are reported in - // state READY in the order in which the addresses are specified, - // which is only true because the backends are all local. - for (size_t i = 0; i < servers_.size(); ++i) { + // Send a number of RPCs, which succeed. + for (size_t i = 0; i < 100; ++i) { CheckRpcSendOk(); - EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; } // Kill all servers for (size_t i = 0; i < servers_.size(); ++i) { diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b5cff664f6..570a3d1067 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + client_load_reporting_interval_seconds), + kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + void ResetBackendCounters() { + for (const auto& backend : backends_) backend->ResetCounters(); + } + ClientStats WaitForLoadReports() { ClientStats client_stats; for (const auto& balancer : balancers_) { @@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test { return client_stats; } + bool SeenAllBackends() { + for (const auto& backend : backends_) { + if (backend->request_count() == 0) return false; + } + return true; + } + + void WaitForAllBackends() { + while (!SeenAllBackends()) { + CheckRpcSendOk(); + } + ResetBackendCounters(); + } + + void WaitForBackend(size_t backend_idx) { + do { + CheckRpcSendOk(); + } while (backends_[backend_idx]->request_count() == 0); + ResetBackendCounters(); + } + struct AddressData { int port; bool is_balancer; @@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test { balancers_.at(i)->add_response(response, delay_ms); } - std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message, - int num_rpcs, - int timeout_ms = 1000) { - std::vector<std::pair<Status, EchoResponse>> results; + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message(message); - for (int i = 0; i < num_rpcs; i++) { - ClientContext context; - context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); - Status status = stub_->Echo(&context, request, &response); - results.push_back(std::make_pair(status, response)); + request.set_message(kRequestMessage_); + ClientContext context; + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk(const size_t times = 1) { + for (size_t i = 0; i < times; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); } - return results; + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } template <typename T> @@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test { const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::vector<std::unique_ptr<BackendServiceImpl>> backends_; std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_; - std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; - grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + + // We need to wait for all backends to come online. + WaitForAllBackends(); + + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { @@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - const auto& statuses_and_responses = - SendRpc(kMessage_, num_backends_, kCallDeadlineMs); + CheckRpcSendOk(num_backends_); const auto ellapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - t0); @@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, RepeatedServerlist) { - constexpr int kServerlistDelayMs = 100; - - // Send a serverlist right away. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - 0); - // ... and the same one a bit later. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - kServerlistDelayMs); - - // Send num_backends/2 requests. - auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2); - // only the first half of the backends will receive them. - for (size_t i = 0; i < backends_.size(); ++i) { - if (i < backends_.size() / 2) - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - else - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - - // Wait for the (duplicated) serverlist update. - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); - - // Verify the LB has sent two responses. - EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Some more calls to complete the total number of backends. - statuses_and_responses = SendRpc( - kMessage_, - num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */); - // Because a duplicated serverlist should have no effect, all backends must - // have been hit once now. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - balancers_[0]->NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); -} - TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - auto statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - // Each backend should have gotten 100 requests. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(kNumRpcsPerAddress, - backend_servers_[i].service_->request_count()); - } + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { for (size_t i = 0; i < backends_.size(); ++i) { if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); for (size_t i = 0; i < num_backends_; ++i) { backends_.emplace_back(new BackendServiceImpl()); backend_servers_.emplace_back(ServerThread<BackendService>( @@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // TODO(dgq): implement the "backend restart" component as well. We need extra // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } @@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // This is serviced by the existing RR policy gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should again have gone to the first backend. EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); @@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // receiving a request. In the meantime, the client continues to be serviced // (by the first backend) without interruption. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); // This is serviced by the existing RR policy backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); - + // Send kNumRpcsPerAddress RPCs for each server and drop address. size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; @@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) { 0, BalancerServiceImpl::BuildResponseForBackends( {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), 0); - const auto& statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); - } + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } TEST_F(SingleBalancerTest, DropAll) { @@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) { 1000); // First call succeeds. - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + CheckRpcSendOk(); // But eventually, the update with only dropped servers is processed and calls // fail. + Status status; do { - statuses_and_responses = SendRpc(kMessage_, 1); - ASSERT_EQ(statuses_and_responses.size(), 1UL); - } while (statuses_and_responses[0].first.ok()); - const Status& status = statuses_and_responses[0].first; + status = SendRpc(); + } while (status.ok()); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } @@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, @@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 83b91e2ce9..985a335f1b 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -35,14 +35,14 @@ grpc_cc_library( "fullstack_fixtures.h", "helpers.h", ], + external_deps = [ + "benchmark", + ], deps = [ "//:grpc++_unsecure", "//src/proto/grpc/testing:echo_proto", "//test/core/util:grpc_test_util_unsecure", ], - external_deps = [ - "benchmark", - ], ) grpc_cc_binary( @@ -76,14 +76,20 @@ grpc_cc_binary( grpc_cc_binary( name = "bm_fullstack_streaming_ping_pong", testonly = 1, - srcs = ["bm_fullstack_streaming_ping_pong.cc"], + srcs = [ + "bm_fullstack_streaming_ping_pong.cc", + "fullstack_streaming_ping_pong.h", + ], deps = [":helpers"], ) grpc_cc_binary( name = "bm_fullstack_streaming_pump", testonly = 1, - srcs = ["bm_fullstack_streaming_pump.cc"], + srcs = [ + "bm_fullstack_streaming_pump.cc", + "fullstack_streaming_pump.h", + ], deps = [":helpers"], ) @@ -92,15 +98,18 @@ grpc_cc_binary( testonly = 1, srcs = ["bm_fullstack_trickle.cc"], deps = [ - ":helpers", - "//test/cpp/util:test_config", + ":helpers", + "//test/cpp/util:test_config", ], ) grpc_cc_binary( name = "bm_fullstack_unary_ping_pong", testonly = 1, - srcs = ["bm_fullstack_unary_ping_pong.cc"], + srcs = [ + "bm_fullstack_unary_ping_pong.cc", + "fullstack_unary_ping_pong.h", + ], deps = [":helpers"], ) diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 518c65ac8d..cadc9b2a11 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -35,7 +35,7 @@ extern "C" { #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" #include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/connected_channel.h" @@ -620,7 +620,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata); typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata); -typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST> +typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST> LoadReportingFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index 0712a40018..655e032faf 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -18,13 +18,7 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h" namespace grpc { namespace testing { @@ -33,365 +27,6 @@ namespace testing { auto& force_library_initialization = Library::get(); /******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of -// messages in each call) in a loop on a single channel -// -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -// Second parameter (i.e state.range(1)): Number of ping pong messages. -// Note: One ping-pong means two messages (one from client to server and -// the other from server to client): -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPong(benchmark::State& state) { - const int msg_size = state.range(0); - const int max_ping_pongs = state.range(1); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - while (state.KeepRunning()) { - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - // Establish async stream between client side and server side - void* t; - bool ok; - int need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - // Send 'max_ping_pongs' number of ping pong messages - int ping_pong_cnt = 0; - while (ping_pong_cnt < max_ping_pongs) { - request_rw->Write(send_request, tag(0)); // Start client send - response_rw.Read(&recv_request, tag(1)); // Start server recv - request_rw->Read(&recv_response, tag(2)); // Start client recv - - need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 1) { - response_rw.Write(send_response, tag(3)); - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - ping_pong_cnt++; - } - - request_rw->WritesDone(tag(0)); - response_rw.Finish(Status::OK, tag(1)); - - Status recv_status; - request_rw->Finish(&recv_status, tag(2)); - - need_tags = (1 << 0) | (1 << 1) | (1 << 2); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); -} - -// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPongMsgs(benchmark::State& state) { - const int msg_size = state.range(0); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - // Establish async stream between client side and server side - void* t; - bool ok; - int need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(0)); // Start client send - response_rw.Read(&recv_request, tag(1)); // Start server recv - request_rw->Read(&recv_response, tag(2)); // Start client recv - - need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 1) { - response_rw.Write(send_response, tag(3)); - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - } - - request_rw->WritesDone(tag(0)); - response_rw.Finish(Status::OK, tag(1)); - Status recv_status; - request_rw->Finish(&recv_status, tag(2)); - - need_tags = (1 << 0) | (1 << 1) | (1 << 2); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * 2); -} - -// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of -// messages in each call) in a loop on a single channel. Different from -// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, -// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving -// sendmsg syscalls for streaming by coalescing 1. initial metadata with first -// message; 2. final streaming message with trailing metadata. -// -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -// Second parameter (i.e state.range(1)): Number of ping pong messages. -// Note: One ping-pong means two messages (one from client to server and -// the other from server to client): -// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish -// API and WriteLast API for server. -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { - const int msg_size = state.range(0); - const int max_ping_pongs = state.range(1); - // This options is used to test out server API: WriteLast and WriteAndFinish - // respectively, since we can not use both of them on server side at the same - // time. Value 1 means we are testing out the WriteAndFinish API, and - // otherwise we are testing out the WriteLast API. - const int write_and_finish = state.range(2); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - while (state.KeepRunning()) { - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - cli_ctx.set_initial_metadata_corked(true); - // tag:1 here will never comes up, since we are not performing any op due - // to initial metadata coalescing. - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - void* t; - bool ok; - int need_tags; - - // Send 'max_ping_pongs' number of ping pong messages - int ping_pong_cnt = 0; - while (ping_pong_cnt < max_ping_pongs) { - if (ping_pong_cnt == max_ping_pongs - 1) { - request_rw->WriteLast(send_request, WriteOptions(), tag(2)); - } else { - request_rw->Write(send_request, tag(2)); // Start client send - } - - need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); - - if (ping_pong_cnt == 0) { - // wait for the server call structure (call_hook, etc.) to be - // initialized (async stream between client side and server side - // established). It is necessary when client init metadata is - // coalesced - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - while ((int)(intptr_t)t != 0) { - // In some cases tag:2 comes before tag:0 (write tag comes out - // first), this while loop is to make sure get tag:0. - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - } - } - - response_rw.Read(&recv_request, tag(3)); // Start server recv - request_rw->Read(&recv_response, tag(4)); // Start client recv - - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 3) { - if (ping_pong_cnt == max_ping_pongs - 1) { - if (write_and_finish == 1) { - response_rw.WriteAndFinish(send_response, WriteOptions(), - Status::OK, tag(5)); - } else { - response_rw.WriteLast(send_response, WriteOptions(), tag(5)); - // WriteLast buffers the write, so neither server write op nor - // client read op will finish inside the while loop. - need_tags &= ~(1 << 4); - need_tags &= ~(1 << 5); - } - } else { - response_rw.Write(send_response, tag(5)); - } - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - ping_pong_cnt++; - } - - if (max_ping_pongs == 0) { - need_tags = (1 << 6) | (1 << 7) | (1 << 8); - } else { - if (write_and_finish == 1) { - need_tags = (1 << 8); - } else { - // server's buffered write and the client's read of the buffered write - // tags should come up. - need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); - } - } - - // No message write or initial metadata write happened yet. - if (max_ping_pongs == 0) { - request_rw->WritesDone(tag(6)); - // wait for server call data structure(call_hook, etc.) to be - // initialized, since initial metadata is corked. - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - while ((int)(intptr_t)t != 0) { - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - } - response_rw.Finish(Status::OK, tag(7)); - } else { - if (write_and_finish != 1) { - response_rw.Finish(Status::OK, tag(7)); - } - } - - Status recv_status; - request_rw->Finish(&recv_status, tag(8)); - - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); -} - -/******************************************************************************* * CONFIGURATIONS */ diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc index 6fbf9da0ad..c7ceacd320 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc @@ -18,157 +18,18 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_streaming_pump.h" namespace grpc { namespace testing { -// force library initialization -auto& force_library_initialization = Library::get(); - -/******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -template <class Fixture> -static void BM_PumpStreamClientToServer(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoRequest send_request; - EchoRequest recv_request; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); - } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Read(&recv_request, tag(0)); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(1)); - while (true) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - response_rw.Read(&recv_request, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); - } - } - } - request_rw->WritesDone(tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Finish(Status::OK, tag(0)); - Status final_status; - request_rw->Finish(&final_status, tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - GPR_ASSERT(final_status.ok()); - } - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); -} - -template <class Fixture> -static void BM_PumpStreamServerToClient(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - if (state.range(0) > 0) { - send_response.set_message(std::string(state.range(0), 'a')); - } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - request_rw->Read(&recv_response, tag(0)); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - response_rw.Write(send_response, tag(1)); - while (true) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - request_rw->Read(&recv_response, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); - } - } - } - response_rw.Finish(Status::OK, tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - } - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); -} - /******************************************************************************* * CONFIGURATIONS */ +// force library initialization +auto& force_library_initialization = Library::get(); + BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) ->Range(0, 128 * 1024 * 1024); BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc index 9af751245f..fa41d114c0 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc @@ -18,13 +18,7 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h" namespace grpc { namespace testing { @@ -33,85 +27,6 @@ namespace testing { auto& force_library_initialization = Library::get(); /******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_UnaryPingPong(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - EchoRequest send_request; - EchoResponse send_response; - EchoResponse recv_response; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); - } - if (state.range(1) > 0) { - send_response.set_message(std::string(state.range(1), 'a')); - } - Status recv_status; - struct ServerEnv { - ServerContext ctx; - EchoRequest recv_request; - grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; - ServerEnv() : response_writer(&ctx) {} - }; - uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; - ServerEnv* server_env[2] = { - reinterpret_cast<ServerEnv*>(server_env_buffer), - reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; - new (server_env[0]) ServerEnv; - new (server_env[1]) ServerEnv; - service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, - &server_env[0]->response_writer, fixture->cq(), - fixture->cq(), tag(0)); - service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, - &server_env[1]->response_writer, fixture->cq(), - fixture->cq(), tag(1)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - recv_response.Clear(); - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( - stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); - void* t; - bool ok; - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - GPR_ASSERT(t == tag(0) || t == tag(1)); - intptr_t slot = reinterpret_cast<intptr_t>(t); - ServerEnv* senv = server_env[slot]; - ServerContextMutator svr_ctx_mut(&senv->ctx); - senv->response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - for (int i = (1 << 3) | (1 << 4); i != 0;) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int tagnum = (int)reinterpret_cast<intptr_t>(t); - GPR_ASSERT(i & (1 << tagnum)); - i -= 1 << tagnum; - } - GPR_ASSERT(recv_status.ok()); - - senv->~ServerEnv(); - senv = new (senv) ServerEnv(); - service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, - fixture->cq(), fixture->cq(), tag(slot)); - } - fixture->Finish(state); - fixture.reset(); - server_env[0]->~ServerEnv(); - server_env[1]->~ServerEnv(); - state.SetBytesProcessed(state.range(0) * state.iterations() + - state.range(1) * state.iterations()); -} - -/******************************************************************************* * CONFIGURATIONS */ diff --git a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h new file mode 100644 index 0000000000..ff1f966753 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h @@ -0,0 +1,396 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPong(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); +} + +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongMsgs(benchmark::State& state) { + const int msg_size = state.range(0); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * 2); +} + +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel. Different from +// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, +// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving +// sendmsg syscalls for streaming by coalescing 1. initial metadata with first +// message; 2. final streaming message with trailing metadata. +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish +// API and WriteLast API for server. +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + // This options is used to test out server API: WriteLast and WriteAndFinish + // respectively, since we can not use both of them on server side at the same + // time. Value 1 means we are testing out the WriteAndFinish API, and + // otherwise we are testing out the WriteLast API. + const int write_and_finish = state.range(2); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 here will never comes up, since we are not performing any op due + // to initial metadata coalescing. + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + void* t; + bool ok; + int need_tags; + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + if (ping_pong_cnt == max_ping_pongs - 1) { + request_rw->WriteLast(send_request, WriteOptions(), tag(2)); + } else { + request_rw->Write(send_request, tag(2)); // Start client send + } + + need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); + + if (ping_pong_cnt == 0) { + // wait for the server call structure (call_hook, etc.) to be + // initialized (async stream between client side and server side + // established). It is necessary when client init metadata is + // coalesced + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + // In some cases tag:2 comes before tag:0 (write tag comes out + // first), this while loop is to make sure get tag:0. + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + } + + response_rw.Read(&recv_request, tag(3)); // Start server recv + request_rw->Read(&recv_response, tag(4)); // Start client recv + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 3) { + if (ping_pong_cnt == max_ping_pongs - 1) { + if (write_and_finish == 1) { + response_rw.WriteAndFinish(send_response, WriteOptions(), + Status::OK, tag(5)); + } else { + response_rw.WriteLast(send_response, WriteOptions(), tag(5)); + // WriteLast buffers the write, so neither server write op nor + // client read op will finish inside the while loop. + need_tags &= ~(1 << 4); + need_tags &= ~(1 << 5); + } + } else { + response_rw.Write(send_response, tag(5)); + } + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + if (max_ping_pongs == 0) { + need_tags = (1 << 6) | (1 << 7) | (1 << 8); + } else { + if (write_and_finish == 1) { + need_tags = (1 << 8); + } else { + // server's buffered write and the client's read of the buffered write + // tags should come up. + need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); + } + } + + // No message write or initial metadata write happened yet. + if (max_ping_pongs == 0) { + request_rw->WritesDone(tag(6)); + // wait for server call data structure(call_hook, etc.) to be + // initialized, since initial metadata is corked. + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + response_rw.Finish(Status::OK, tag(7)); + } else { + if (write_and_finish != 1) { + response_rw.Finish(Status::OK, tag(7)); + } + } + + Status recv_status; + request_rw->Finish(&recv_status, tag(8)); + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H diff --git a/test/cpp/microbenchmarks/fullstack_streaming_pump.h b/test/cpp/microbenchmarks/fullstack_streaming_pump.h new file mode 100644 index 0000000000..f9db212b02 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_streaming_pump.h @@ -0,0 +1,170 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +template <class Fixture> +static void BM_PumpStreamClientToServer(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoRequest send_request; + EchoRequest recv_request; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + request_rw->Write(send_request, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + response_rw.Read(&recv_request, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + request_rw->WritesDone(tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + response_rw.Finish(Status::OK, tag(0)); + Status final_status; + request_rw->Finish(&final_status, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + GPR_ASSERT(final_status.ok()); + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); +} + +template <class Fixture> +static void BM_PumpStreamServerToClient(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_response.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + response_rw.Write(send_response, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + request_rw->Read(&recv_response, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + response_rw.Finish(Status::OK, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h new file mode 100644 index 0000000000..76d278b2a0 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h @@ -0,0 +1,116 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_UnaryPingPong(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + EchoRequest send_request; + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + if (state.range(1) > 0) { + send_response.set_message(std::string(state.range(1), 'a')); + } + Status recv_status; + struct ServerEnv { + ServerContext ctx; + EchoRequest recv_request; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; + ServerEnv() : response_writer(&ctx) {} + }; + uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; + ServerEnv* server_env[2] = { + reinterpret_cast<ServerEnv*>(server_env_buffer), + reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; + new (server_env[0]) ServerEnv; + new (server_env[1]) ServerEnv; + service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, + &server_env[0]->response_writer, fixture->cq(), + fixture->cq(), tag(0)); + service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, + &server_env[1]->response_writer, fixture->cq(), + fixture->cq(), tag(1)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + recv_response.Clear(); + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + void* t; + bool ok; + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + GPR_ASSERT(t == tag(0) || t == tag(1)); + intptr_t slot = reinterpret_cast<intptr_t>(t); + ServerEnv* senv = server_env[slot]; + ServerContextMutator svr_ctx_mut(&senv->ctx); + senv->response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + for (int i = (1 << 3) | (1 << 4); i != 0;) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int tagnum = (int)reinterpret_cast<intptr_t>(t); + GPR_ASSERT(i & (1 << tagnum)); + i -= 1 << tagnum; + } + GPR_ASSERT(recv_status.ok()); + + senv->~ServerEnv(); + senv = new (senv) ServerEnv(); + service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, + fixture->cq(), fixture->cq(), tag(slot)); + } + fixture->Finish(state); + fixture.reset(); + server_env[0]->~ServerEnv(); + server_env[1]->~ServerEnv(); + state.SetBytesProcessed(state.range(0) * state.iterations() + + state.range(1) * state.iterations()); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 97e2aef914..912c871482 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -141,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { if (!next_issue_) { // ready to issue RunNextState(true, nullptr); } else { // wait for the issue time - alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); } } }; @@ -360,8 +361,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { break; // loop around, don't return case State::WAIT: next_state_ = State::READY_TO_WRITE; - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); return true; case State::READY_TO_WRITE: if (!ok) { @@ -518,8 +519,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { } break; // loop around, don't return case State::WAIT: - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: @@ -760,8 +761,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { break; // loop around, don't return case State::WAIT: next_state_ = State::READY_TO_WRITE; - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); return true; case State::READY_TO_WRITE: if (!ok) { diff --git a/tools/codegen/core/gen_stats_data.py b/tools/codegen/core/gen_stats_data.py index df6cd5a6bd..8e4ef594af 100755 --- a/tools/codegen/core/gen_stats_data.py +++ b/tools/codegen/core/gen_stats_data.py @@ -19,13 +19,30 @@ import ctypes import math import sys import yaml +import json with open('src/core/lib/debug/stats_data.yaml') as f: attrs = yaml.load(f.read()) +REQUIRED_FIELDS = ['name', 'doc'] + +def make_type(name, fields): + return (collections.namedtuple(name, ' '.join(list(set(REQUIRED_FIELDS + fields)))), []) + +def c_str(s, encoding='ascii'): + if isinstance(s, unicode): + s = s.encode(encoding) + result = '' + for c in s: + if not (32 <= ord(c) < 127) or c in ('\\', '"'): + result += '\\%03o' % ord(c) + else: + result += c + return '"' + result + '"' + types = ( - (collections.namedtuple('Counter', 'name'), []), - (collections.namedtuple('Histogram', 'name max buckets'), []), + make_type('Counter', []), + make_type('Histogram', ['max', 'buckets']), ) inst_map = dict((t[0].__name__, t[1]) for t in types) @@ -200,6 +217,8 @@ with open('src/core/lib/debug/stats_data.h', 'w') as H: print >>H, "} grpc_stats_%ss;" % (typename.lower()) print >>H, "extern const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT];" % ( typename.lower(), typename.upper()) + print >>H, "extern const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT];" % ( + typename.lower(), typename.upper()) histo_start = [] histo_buckets = [] @@ -269,8 +288,14 @@ with open('src/core/lib/debug/stats_data.c', 'w') as C: print >>C, "const char *grpc_stats_%s_name[GRPC_STATS_%s_COUNT] = {" % ( typename.lower(), typename.upper()) for inst in instances: - print >>C, " \"%s\"," % inst.name + print >>C, " %s," % c_str(inst.name) print >>C, "};" + print >>C, "const char *grpc_stats_%s_doc[GRPC_STATS_%s_COUNT] = {" % ( + typename.lower(), typename.upper()) + for inst in instances: + print >>C, " %s," % c_str(inst.doc) + print >>C, "};" + for i, tbl in enumerate(static_tables): print >>C, "const %s grpc_stats_table_%d[%d] = {%s};" % ( tbl[0], i, len(tbl[1]), ','.join('%s' % x for x in tbl[1])) diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core index e4cc1c7461..632735342b 100644 --- a/tools/doxygen/Doxyfile.core +++ b/tools/doxygen/Doxyfile.core @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 4.0.0-dev +PROJECT_NUMBER = 5.0.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c2daf900fb..e352cb78f1 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 4.0.0-dev +PROJECT_NUMBER = 5.0.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a @@ -979,10 +979,10 @@ src/core/ext/filters/http/message_compress/message_compress_filter.c \ src/core/ext/filters/http/message_compress/message_compress_filter.h \ src/core/ext/filters/http/server/http_server_filter.c \ src/core/ext/filters/http/server/http_server_filter.h \ -src/core/ext/filters/load_reporting/load_reporting.c \ -src/core/ext/filters/load_reporting/load_reporting.h \ -src/core/ext/filters/load_reporting/load_reporting_filter.c \ -src/core/ext/filters/load_reporting/load_reporting_filter.h \ +src/core/ext/filters/load_reporting/server_load_reporting_filter.c \ +src/core/ext/filters/load_reporting/server_load_reporting_filter.h \ +src/core/ext/filters/load_reporting/server_load_reporting_plugin.c \ +src/core/ext/filters/load_reporting/server_load_reporting_plugin.h \ src/core/ext/filters/max_age/max_age_filter.c \ src/core/ext/filters/max_age/max_age_filter.h \ src/core/ext/filters/message_size/message_size_filter.c \ diff --git a/tools/flakes/detect_flakes.py b/tools/flakes/detect_flakes.py index 2aff4c0872..c5c7f61771 100644 --- a/tools/flakes/detect_flakes.py +++ b/tools/flakes/detect_flakes.py @@ -33,14 +33,17 @@ sys.path.append(gcp_utils_dir) import big_query_utils def print_table(table): - for i, (k, v) in enumerate(table.items()): + kokoro_base_url = 'https://kokoro.corp.google.com/job/' + for k, v in table.items(): job_name = v[0] build_id = v[1] ts = int(float(v[2])) # TODO(dgq): timezone handling is wrong. We need to determine the timezone # of the computer running this script. human_ts = datetime.datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S PDT') - print("{}. Test: {}, Timestamp: {}, id: {}@{}\n".format(i, k, human_ts, job_name, build_id)) + job_path = '{}/{}'.format('/job/'.join(job_name.split('/')), build_id) + full_kokoro_url = kokoro_base_url + job_path + print("Test: {}, Timestamp: {}, url: {}\n".format(k, human_ts, full_kokoro_url)) def get_flaky_tests(days_lower_bound, days_upper_bound, limit=None): diff --git a/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg new file mode 100644 index 0000000000..24e7984f3a --- /dev/null +++ b/tools/internal_ci/linux/grpc_sanity_webhook_test.cfg @@ -0,0 +1,30 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh" +timeout_mins: 20 +action { + define_artifacts { + regex: "**/*sponge_log.xml" + regex: "github/grpc/reports/**" + } +} + +env_vars { + key: "RUN_TESTS_FLAGS" + value: "-f basictests linux sanity opt --inner_jobs 16 -j 1 --internal_ci" +} diff --git a/tools/jenkins/run_performance_profile_daily.sh b/tools/jenkins/run_performance_profile_daily.sh index 26ee87d240..04a2464aee 100755 --- a/tools/jenkins/run_performance_profile_daily.sh +++ b/tools/jenkins/run_performance_profile_daily.sh @@ -27,4 +27,6 @@ else PYTHON=python2.7 fi -$PYTHON tools/run_tests/run_microbenchmark.py --collect summary perf latency +BENCHMARKS_TO_RUN="bm_fullstack_unary_ping_pong bm_fullstack_streaming_ping_pong bm_fullstack_streaming_pump bm_closure bm_cq bm_call_create bm_error bm_chttp2_hpack bm_chttp2_transport bm_pollset bm_metadata" + +$PYTHON tools/run_tests/run_microbenchmark.py --collect summary perf latency -b $BENCHMARKS_TO_RUN diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index dd9a171268..2f6e34bfb3 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2790,12 +2790,15 @@ "grpc_test_util_unsecure", "grpc_unsecure" ], - "headers": [], + "headers": [ + "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h" + ], "is_filegroup": false, "language": "c++", "name": "bm_fullstack_streaming_ping_pong", "src": [ - "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc" + "test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc", + "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h" ], "third_party": false, "type": "target" @@ -2811,12 +2814,15 @@ "grpc_test_util_unsecure", "grpc_unsecure" ], - "headers": [], + "headers": [ + "test/cpp/microbenchmarks/fullstack_streaming_pump.h" + ], "is_filegroup": false, "language": "c++", "name": "bm_fullstack_streaming_pump", "src": [ - "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc" + "test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc", + "test/cpp/microbenchmarks/fullstack_streaming_pump.h" ], "third_party": false, "type": "target" @@ -2854,12 +2860,15 @@ "grpc_test_util_unsecure", "grpc_unsecure" ], - "headers": [], + "headers": [ + "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h" + ], "is_filegroup": false, "language": "c++", "name": "bm_fullstack_unary_ping_pong", "src": [ - "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc" + "test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc", + "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h" ], "third_party": false, "type": "target" @@ -5890,7 +5899,6 @@ "grpc_lb_policy_grpclb_secure", "grpc_lb_policy_pick_first", "grpc_lb_policy_round_robin", - "grpc_load_reporting", "grpc_max_age_filter", "grpc_message_size_filter", "grpc_resolver_dns_ares", @@ -5899,6 +5907,7 @@ "grpc_resolver_sockaddr", "grpc_secure", "grpc_server_backward_compatibility", + "grpc_server_load_reporting", "grpc_transport_chttp2_client_insecure", "grpc_transport_chttp2_client_secure", "grpc_transport_chttp2_server_insecure", @@ -5920,7 +5929,7 @@ "deps": [ "gpr", "grpc_base", - "grpc_load_reporting", + "grpc_server_load_reporting", "grpc_transport_chttp2_client_secure", "grpc_transport_cronet_client_secure" ], @@ -5997,7 +6006,6 @@ "grpc_lb_policy_grpclb", "grpc_lb_policy_pick_first", "grpc_lb_policy_round_robin", - "grpc_load_reporting", "grpc_max_age_filter", "grpc_message_size_filter", "grpc_resolver_dns_ares", @@ -6005,6 +6013,7 @@ "grpc_resolver_fake", "grpc_resolver_sockaddr", "grpc_server_backward_compatibility", + "grpc_server_load_reporting", "grpc_transport_chttp2_client_insecure", "grpc_transport_chttp2_server_insecure", "grpc_transport_inproc", @@ -8565,27 +8574,6 @@ "grpc_base" ], "headers": [ - "src/core/ext/filters/load_reporting/load_reporting.h", - "src/core/ext/filters/load_reporting/load_reporting_filter.h" - ], - "is_filegroup": true, - "language": "c", - "name": "grpc_load_reporting", - "src": [ - "src/core/ext/filters/load_reporting/load_reporting.c", - "src/core/ext/filters/load_reporting/load_reporting.h", - "src/core/ext/filters/load_reporting/load_reporting_filter.c", - "src/core/ext/filters/load_reporting/load_reporting_filter.h" - ], - "third_party": false, - "type": "filegroup" - }, - { - "deps": [ - "gpr", - "grpc_base" - ], - "headers": [ "src/core/ext/filters/max_age/max_age_filter.h" ], "is_filegroup": true, @@ -8793,6 +8781,27 @@ { "deps": [ "gpr", + "grpc_base" + ], + "headers": [ + "src/core/ext/filters/load_reporting/server_load_reporting_filter.h", + "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" + ], + "is_filegroup": true, + "language": "c", + "name": "grpc_server_load_reporting", + "src": [ + "src/core/ext/filters/load_reporting/server_load_reporting_filter.c", + "src/core/ext/filters/load_reporting/server_load_reporting_filter.h", + "src/core/ext/filters/load_reporting/server_load_reporting_plugin.c", + "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", "gpr_test_util", "grpc_base", "grpc_client_channel", |