From 77532e8bf3d4c93b680ac63a5e436cea92c708f5 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 27 Feb 2017 11:52:39 -0800 Subject: Destroy pointer args when destructing a ChannelArguments --- include/grpc++/support/channel_arguments.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index efdf7772ad..80a3bfda3d 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -54,7 +54,7 @@ class ResourceQuota; class ChannelArguments { public: ChannelArguments(); - ~ChannelArguments() {} + ~ChannelArguments(); ChannelArguments(const ChannelArguments& other); ChannelArguments& operator=(ChannelArguments other) { -- cgit v1.2.3 From fecba535d99ec2c819a0d26707047bf2f2f323fa Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 17 Mar 2017 09:50:48 -0700 Subject: Switch to using a CAS loop to update the token value. --- BUILD | 1 + CMakeLists.txt | 1 + Makefile | 1 + binding.gyp | 1 + build.yaml | 1 + config.m4 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + include/grpc/impl/codegen/atm.h | 5 +++ package.xml | 1 + src/core/ext/client_channel/retry_throttle.c | 36 +++-------------- src/core/lib/support/atm.c | 47 ++++++++++++++++++++++ src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 1 + vsprojects/vcxproj/gpr/gpr.vcxproj | 2 + vsprojects/vcxproj/gpr/gpr.vcxproj.filters | 3 ++ 17 files changed, 75 insertions(+), 30 deletions(-) create mode 100644 src/core/lib/support/atm.c (limited to 'include') diff --git a/BUILD b/BUILD index 4e1f20c3b2..1fe72c02db 100644 --- a/BUILD +++ b/BUILD @@ -309,6 +309,7 @@ grpc_cc_library( "src/core/lib/profiling/basic_timers.c", "src/core/lib/profiling/stap_timers.c", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/cmdline.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8bc07255f1..9e99062f58 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -693,6 +693,7 @@ add_library(gpr src/core/lib/profiling/basic_timers.c src/core/lib/profiling/stap_timers.c src/core/lib/support/alloc.c + src/core/lib/support/atm.c src/core/lib/support/avl.c src/core/lib/support/backoff.c src/core/lib/support/cmdline.c diff --git a/Makefile b/Makefile index 11bac54c79..2f7120987a 100644 --- a/Makefile +++ b/Makefile @@ -2599,6 +2599,7 @@ LIBGPR_SRC = \ src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/binding.gyp b/binding.gyp index f6a04b27f9..1107f31889 100644 --- a/binding.gyp +++ b/binding.gyp @@ -544,6 +544,7 @@ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/build.yaml b/build.yaml index ae546cbb30..7b27968c61 100644 --- a/build.yaml +++ b/build.yaml @@ -101,6 +101,7 @@ filegroups: - src/core/lib/profiling/basic_timers.c - src/core/lib/profiling/stap_timers.c - src/core/lib/support/alloc.c + - src/core/lib/support/atm.c - src/core/lib/support/avl.c - src/core/lib/support/backoff.c - src/core/lib/support/cmdline.c diff --git a/config.m4 b/config.m4 index 5eaf161f09..010401f2fb 100644 --- a/config.m4 +++ b/config.m4 @@ -39,6 +39,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2fb00a3afe..c78e4a7023 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -211,6 +211,7 @@ Pod::Spec.new do |s| 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/grpc.gemspec b/grpc.gemspec index 1ca2446e65..95aba00fd7 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -97,6 +97,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/profiling/basic_timers.c ) s.files += %w( src/core/lib/profiling/stap_timers.c ) s.files += %w( src/core/lib/support/alloc.c ) + s.files += %w( src/core/lib/support/atm.c ) s.files += %w( src/core/lib/support/avl.c ) s.files += %w( src/core/lib/support/backoff.c ) s.files += %w( src/core/lib/support/cmdline.c ) diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h index ae00fb0f16..4bd572d6d1 100644 --- a/include/grpc/impl/codegen/atm.h +++ b/include/grpc/impl/codegen/atm.h @@ -92,4 +92,9 @@ #error could not determine platform for atm #endif +/** Adds \a delta to \a *value, clamping the result to the range specified + by \a min and \a max. Returns the new value. */ +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max); + #endif /* GRPC_IMPL_CODEGEN_ATM_H */ diff --git a/package.xml b/package.xml index e29f462d33..83ad2d2129 100644 --- a/package.xml +++ b/package.xml @@ -106,6 +106,7 @@ + diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 7b813c33df..8926c3d782 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -73,20 +73,9 @@ bool grpc_server_retry_throttle_data_record_failure( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. - const int delta = -1000; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us below 0, then re-add the excess. Note - // that between these two atomic operations, the value will be - // artificially low by as much as 1000, but this window should be - // brief. - int new_value = old_value - 1000; - if (new_value < 0) { - const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - new_value = 0; - } + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). return new_value > throttle_data->max_milli_tokens / 2; @@ -97,22 +86,9 @@ void grpc_server_retry_throttle_data_record_success( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = throttle_data->milli_token_ratio; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us over max_milli_tokens, then subtract - // the excess. Note that between these two atomic operations, the - // value will be artificially high by as much as milli_token_ratio, - // but this window should be brief. - const int new_value = old_value + throttle_data->milli_token_ratio; - if (new_value > throttle_data->max_milli_tokens) { - const int excess_value = - new_value - (old_value > throttle_data->max_milli_tokens - ? old_value - : throttle_data->max_milli_tokens); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - } + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); } grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c new file mode 100644 index 0000000000..06e8432caf --- /dev/null +++ b/src/core/lib/support/atm.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include + +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 94d6e46cae..da0dba7dfe 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -33,6 +33,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fbe1f7f78e..7147d152ef 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1230,6 +1230,7 @@ src/core/lib/slice/slice_internal.h \ src/core/lib/slice/slice_string_helpers.c \ src/core/lib/slice/slice_string_helpers.h \ src/core/lib/support/alloc.c \ +src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/backoff.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index b2f9078c05..7a6295cf72 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7311,6 +7311,7 @@ "src/core/lib/profiling/stap_timers.c", "src/core/lib/profiling/timers.h", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/backoff.h", diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj index 44c21ddeb3..67ac3b98c5 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj @@ -208,6 +208,8 @@ + + diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters index a5924a624a..c49c87ed60 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters @@ -10,6 +10,9 @@ src\core\lib\support + + src\core\lib\support + src\core\lib\support -- cgit v1.2.3 From 18f09a01f659e9004740766dfc5ab46e2aeea00e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 23 Feb 2017 17:10:04 -0800 Subject: Add benchmark suite for chttp2 --- CMakeLists.txt | 42 ++ Makefile | 48 ++ build.yaml | 20 + include/grpc++/support/channel_arguments.h | 4 +- src/core/lib/iomgr/closure.c | 4 + src/core/lib/iomgr/combiner.c | 2 + test/cpp/microbenchmarks/bm_chttp2_transport.cc | 587 +++++++++++++++++++++ tools/run_tests/generated/sources_and_headers.json | 21 + tools/run_tests/generated/tests.json | 23 + 9 files changed, 749 insertions(+), 2 deletions(-) create mode 100644 test/cpp/microbenchmarks/bm_chttp2_transport.cc (limited to 'include') diff --git a/CMakeLists.txt b/CMakeLists.txt index 851aeb8401..28d3b5152f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -584,6 +584,9 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_chttp2_hpack) endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_dependencies(buildtests_cxx bm_chttp2_transport) +endif() +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_closure) endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -7791,6 +7794,45 @@ endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_executable(bm_chttp2_transport + test/cpp/microbenchmarks/bm_chttp2_transport.cc + third_party/googletest/src/gtest-all.cc +) + + +target_include_directories(bm_chttp2_transport + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE third_party/googletest/include + PRIVATE third_party/googletest + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(bm_chttp2_transport + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_benchmark + benchmark + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif() +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_executable(bm_closure test/cpp/microbenchmarks/bm_closure.cc third_party/googletest/src/gtest-all.cc diff --git a/Makefile b/Makefile index d2104e973c..ac70d1a6be 100644 --- a/Makefile +++ b/Makefile @@ -1051,6 +1051,7 @@ auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test bm_arena: $(BINDIR)/$(CONFIG)/bm_arena bm_call_create: $(BINDIR)/$(CONFIG)/bm_call_create bm_chttp2_hpack: $(BINDIR)/$(CONFIG)/bm_chttp2_hpack +bm_chttp2_transport: $(BINDIR)/$(CONFIG)/bm_chttp2_transport bm_closure: $(BINDIR)/$(CONFIG)/bm_closure bm_cq: $(BINDIR)/$(CONFIG)/bm_cq bm_error: $(BINDIR)/$(CONFIG)/bm_error @@ -1475,6 +1476,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_arena \ $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_chttp2_hpack \ + $(BINDIR)/$(CONFIG)/bm_chttp2_transport \ $(BINDIR)/$(CONFIG)/bm_closure \ $(BINDIR)/$(CONFIG)/bm_cq \ $(BINDIR)/$(CONFIG)/bm_error \ @@ -1592,6 +1594,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_arena \ $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_chttp2_hpack \ + $(BINDIR)/$(CONFIG)/bm_chttp2_transport \ $(BINDIR)/$(CONFIG)/bm_closure \ $(BINDIR)/$(CONFIG)/bm_cq \ $(BINDIR)/$(CONFIG)/bm_error \ @@ -1939,6 +1942,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/bm_call_create || ( echo test bm_call_create failed ; exit 1 ) $(E) "[RUN] Testing bm_chttp2_hpack" $(Q) $(BINDIR)/$(CONFIG)/bm_chttp2_hpack || ( echo test bm_chttp2_hpack failed ; exit 1 ) + $(E) "[RUN] Testing bm_chttp2_transport" + $(Q) $(BINDIR)/$(CONFIG)/bm_chttp2_transport || ( echo test bm_chttp2_transport failed ; exit 1 ) $(E) "[RUN] Testing bm_closure" $(Q) $(BINDIR)/$(CONFIG)/bm_closure || ( echo test bm_closure failed ; exit 1 ) $(E) "[RUN] Testing bm_cq" @@ -12830,6 +12835,49 @@ endif endif +BM_CHTTP2_TRANSPORT_SRC = \ + test/cpp/microbenchmarks/bm_chttp2_transport.cc \ + +BM_CHTTP2_TRANSPORT_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_CHTTP2_TRANSPORT_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/bm_chttp2_transport: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/bm_chttp2_transport: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/bm_chttp2_transport: $(PROTOBUF_DEP) $(BM_CHTTP2_TRANSPORT_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(BM_CHTTP2_TRANSPORT_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_chttp2_transport + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_chttp2_transport.o: $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_bm_chttp2_transport: $(BM_CHTTP2_TRANSPORT_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BM_CHTTP2_TRANSPORT_OBJS:.o=.dep) +endif +endif + + BM_CLOSURE_SRC = \ test/cpp/microbenchmarks/bm_closure.cc \ diff --git a/build.yaml b/build.yaml index 80c9849ca4..be31935cb0 100644 --- a/build.yaml +++ b/build.yaml @@ -3128,6 +3128,26 @@ targets: - mac - linux - posix +- name: bm_chttp2_transport + build: test + language: c++ + src: + - test/cpp/microbenchmarks/bm_chttp2_transport.cc + deps: + - grpc_benchmark + - benchmark + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + args: + - --benchmark_min_time=0 + platforms: + - mac + - linux + - posix - name: bm_closure build: test language: c++ diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index 80a3bfda3d..61307d6194 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -117,10 +117,10 @@ class ChannelArguments { /// Return (by value) a c grpc_channel_args structure which points to /// arguments owned by this ChannelArguments instance - grpc_channel_args c_channel_args() { + grpc_channel_args c_channel_args() const { grpc_channel_args out; out.num_args = args_.size(); - out.args = args_.empty() ? NULL : &args_[0]; + out.args = args_.empty() ? NULL : const_cast(&args_[0]); return out; } diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 509c1ff95d..6633fb68ec 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/closure.h" +#include #include #include @@ -124,6 +125,7 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->run(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -135,6 +137,7 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_sched", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -146,6 +149,7 @@ void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { grpc_closure *c = list->head; while (c != NULL) { grpc_closure *next = c->next_data.next; + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); c = next; } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index fa9966c3a6..2bc476bbef 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/combiner.h" +#include #include #include @@ -216,6 +217,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc new file mode 100644 index 0000000000..254d57de20 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -0,0 +1,587 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Microbenchmarks around CHTTP2 transport operations */ + +#include +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/transport/static_metadata.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +auto &force_library_initialization = Library::get(); + +//////////////////////////////////////////////////////////////////////////////// +// Helper classes +// + +class DummyEndpoint : public grpc_endpoint { + public: + DummyEndpoint() { + static const grpc_endpoint_vtable my_vtable = {read, + write, + get_workqueue, + add_to_pollset, + add_to_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; + grpc_endpoint::vtable = &my_vtable; + ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); + } + + void PushInput(grpc_exec_ctx *exec_ctx, grpc_slice slice) { + if (read_cb_ == nullptr) { + GPR_ASSERT(!have_slice_); + buffered_slice_ = slice; + have_slice_ = true; + return; + } + grpc_slice_buffer_add(slices_, slice); + grpc_closure_sched(exec_ctx, read_cb_, GRPC_ERROR_NONE); + read_cb_ = nullptr; + } + + private: + grpc_resource_user *ru_; + grpc_closure *read_cb_ = nullptr; + grpc_slice_buffer *slices_ = nullptr; + bool have_slice_ = false; + grpc_slice buffered_slice_; + + void QueueRead(grpc_exec_ctx *exec_ctx, grpc_slice_buffer *slices, + grpc_closure *cb) { + GPR_ASSERT(read_cb_ == nullptr); + if (have_slice_) { + have_slice_ = false; + grpc_slice_buffer_add(slices, buffered_slice_); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + return; + } + read_cb_ = cb; + slices_ = slices; + } + + static void read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + static_cast(ep)->QueueRead(exec_ctx, slices, cb); + } + + static void write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + } + + static grpc_workqueue *get_workqueue(grpc_endpoint *ep) { return NULL; } + + static void add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) {} + + static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { + grpc_resource_user_shutdown(exec_ctx, + static_cast(ep)->ru_); + grpc_closure_sched(exec_ctx, static_cast(ep)->read_cb_, + why); + } + + static void destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_resource_user_unref(exec_ctx, static_cast(ep)->ru_); + delete static_cast(ep); + } + + static grpc_resource_user *get_resource_user(grpc_endpoint *ep) { + return static_cast(ep)->ru_; + } + static char *get_peer(grpc_endpoint *ep) { return gpr_strdup("test"); } + static int get_fd(grpc_endpoint *ep) { return 0; } +}; + +class Fixture { + public: + Fixture(const grpc::ChannelArguments &args, bool client) { + grpc_channel_args c_args = args.c_channel_args(); + ep_ = new DummyEndpoint; + t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, NULL); + FlushExecCtx(); + } + + void FlushExecCtx() { grpc_exec_ctx_flush(&exec_ctx_); } + + ~Fixture() { + grpc_transport_destroy(&exec_ctx_, t_); + grpc_exec_ctx_finish(&exec_ctx_); + } + + grpc_chttp2_transport *chttp2_transport() { + return reinterpret_cast(t_); + } + grpc_transport *transport() { return t_; } + grpc_exec_ctx *exec_ctx() { return &exec_ctx_; } + + void PushInput(grpc_slice slice) { ep_->PushInput(exec_ctx(), slice); } + + private: + DummyEndpoint *ep_; + grpc_exec_ctx exec_ctx_ = GRPC_EXEC_CTX_INIT; + grpc_transport *t_; +}; + +static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +class Stream { + public: + Stream(Fixture *f) : f_(f) { + GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); + stream_size_ = grpc_transport_stream_size(f->transport()); + stream_ = gpr_malloc(stream_size_); + arena_ = gpr_arena_create(4096); + } + + ~Stream() { + gpr_free(stream_); + gpr_arena_destroy(arena_); + } + + void Init(benchmark::State &state) { + memset(stream_, 0, stream_size_); + if ((state.iterations() & 0xffff) == 0) { + gpr_arena_destroy(arena_); + arena_ = gpr_arena_create(4096); + } + grpc_transport_init_stream(f_->exec_ctx(), f_->transport(), + static_cast(stream_), &refcount_, + NULL, arena_); + } + + void DestroyThen(grpc_closure *closure) { + grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), + static_cast(stream_), closure); + } + + void Op(grpc_transport_stream_op *op) { + grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + static_cast(stream_), op); + } + + grpc_chttp2_stream *chttp2_stream() { + return static_cast(stream_); + } + + private: + Fixture *f_; + grpc_stream_refcount refcount_; + gpr_arena *arena_; + size_t stream_size_; + void *stream_; +}; + +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template +std::unique_ptr MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + grpc_closure_init(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr(new C(f, sched)); +} + +template +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast(arg)->f_(exec_ctx, error); + delete static_cast(arg); + } + }; + auto *c = new C{f}; + return grpc_closure_init(c, C::Execute, c, sched); +} + +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks +// + +static void BM_StreamCreateDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + std::unique_ptr next = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + s.DestroyThen(next.get()); + }); + grpc_closure_run(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_StreamCreateDestroy); + +class RepresentativeClientInitialMetadata { + public: + static std::vector GetElems(grpc_exec_ctx *exec_ctx) { + return { + GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, + grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, + grpc_slice_intern(grpc_slice_from_static_string( + "foo.test.google.fr:1234"))), + GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, + GRPC_MDELEM_TE_TRAILERS, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_USER_AGENT, + grpc_slice_intern(grpc_slice_from_static_string( + "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; + } +}; + +template +static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + grpc_transport_stream_op op; + std::unique_ptr start; + std::unique_ptr done; + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector elems = Metadata::GetElems(f.exec_ctx()); + std::vector storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + f.FlushExecCtx(); + start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + memset(&op, 0, sizeof(op)); + op.on_complete = done.get(); + op.send_initial_metadata = &b; + s.Op(&op); + }); + done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen(start.get()); + }); + grpc_closure_sched(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + track_counters.Finish(state); +} +BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, + RepresentativeClientInitialMetadata); + +static void BM_TransportEmptyOp(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + std::unique_ptr c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + memset(&op, 0, sizeof(op)); + op.on_complete = c.get(); + s.Op(&op); + }); + grpc_closure_sched(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_TransportEmptyOp); + +static void BM_TransportStreamSend(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + grpc_slice_buffer_stream send_stream; + grpc_slice_buffer send_buffer; + grpc_slice_buffer_init(&send_buffer); + grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0))); + memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0, + GRPC_SLICE_LENGTH(send_buffer.slices[0])); + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; + grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); + memset(&op, 0, sizeof(op)); + op.on_complete = c.get(); + op.send_message = &send_stream.base; + s.Op(&op); + }); + + memset(&op, 0, sizeof(op)); + op.send_initial_metadata = &b; + op.on_complete = c.get(); + s.Op(&op); + + f.FlushExecCtx(); + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_slice_buffer_destroy(&send_buffer); +} +BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); + +#define SLICE_FROM_BUFFER(s) grpc_slice_from_static_buffer(s, sizeof(s) - 1) + +static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) { + std::queue unframed; + + unframed.push(static_cast(0)); + unframed.push(static_cast(length >> 24)); + unframed.push(static_cast(length >> 16)); + unframed.push(static_cast(length >> 8)); + unframed.push(static_cast(length)); + for (size_t i = 0; i < length; i++) { + unframed.push('a'); + } + + std::vector framed; + while (unframed.size() > frame_size) { + // frame size + framed.push_back(static_cast(frame_size >> 16)); + framed.push_back(static_cast(frame_size >> 8)); + framed.push_back(static_cast(frame_size)); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + // frame data + for (size_t i = 0; i < frame_size; i++) { + framed.push_back(unframed.front()); + unframed.pop(); + } + } + + // frame size + framed.push_back(static_cast(unframed.size() >> 16)); + framed.push_back(static_cast(unframed.size() >> 8)); + framed.push_back(static_cast(unframed.size())); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + while (!unframed.empty()) { + framed.push_back(unframed.front()); + unframed.pop(); + } + + return grpc_slice_from_copied_buffer(framed.data(), framed.size()); +} + +static void BM_TransportStreamRecv(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + grpc_byte_stream *recv_stream; + grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + grpc_metadata_batch b_recv; + grpc_metadata_batch_init(&b_recv); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr do_nothing = + MakeClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}); + + uint32_t received; + + std::unique_ptr drain_start; + std::unique_ptr drain; + std::unique_ptr drain_continue; + grpc_slice recv_slice; + + std::unique_ptr c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024; + received = 0; + memset(&op, 0, sizeof(op)); + op.on_complete = do_nothing.get(); + op.recv_message = &recv_stream; + op.recv_message_ready = drain_start.get(); + s.Op(&op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); + + drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (recv_stream == NULL) { + GPR_ASSERT(!state.KeepRunning()); + return; + } + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + drain = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + do { + if (received == recv_stream->length) { + grpc_byte_stream_destroy(exec_ctx, recv_stream); + grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); + return; + } + } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice, + recv_stream->length - received, + drain_continue.get())); + }); + + drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + received += GRPC_SLICE_LENGTH(recv_slice); + grpc_slice_unref_internal(exec_ctx, recv_slice); + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + memset(&op, 0, sizeof(op)); + op.send_initial_metadata = &b; + op.recv_initial_metadata = &b_recv; + op.on_complete = c.get(); + s.Op(&op); + f.PushInput(SLICE_FROM_BUFFER( + "\x00\x00\x00\x04\x00\x00\x00\x00\x00" + // Generated using: + // tools/codegen/core/gen_header_frame.py < + // test/cpp/microbenchmarks/representative_server_initial_metadata.headers + "\x00\x00X\x01\x04\x00\x00\x00\x01" + "\x10\x07:status\x03" + "200" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip")); + + f.FlushExecCtx(); + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b_recv); + grpc_slice_unref(incoming_data); +} +BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); + +BENCHMARK_MAIN(); diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 9160b0d9d6..2e272e654f 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2507,6 +2507,27 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "benchmark", + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_benchmark", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "bm_chttp2_transport", + "src": [ + "test/cpp/microbenchmarks/bm_chttp2_transport.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "benchmark", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 1af05162b4..2dc5198ca8 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2695,6 +2695,28 @@ "posix" ] }, + { + "args": [ + "--benchmark_min_time=0" + ], + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c++", + "name": "bm_chttp2_transport", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, { "args": [ "--benchmark_min_time=0" @@ -117505,6 +117527,7 @@ "language": "c", "name": "hpack_parser_fuzzer_test_one_entry", "platforms": [ + "mac", "linux" ], "uses_polling": false -- cgit v1.2.3 From bf18428740ebf0f36b68fcf9b7d427538e074a31 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 21 Mar 2017 15:18:58 -0700 Subject: Introduce grpc_completion_queue_factory API Just the API and a bare-bone implementation --- CMakeLists.txt | 5 ++ Makefile | 5 ++ binding.gyp | 1 + build.yaml | 2 + config.m4 | 1 + gRPC-Core.podspec | 3 + grpc.def | 3 + grpc.gemspec | 2 + include/grpc/grpc.h | 64 ++++++++++++++++++ package.xml | 2 + src/core/lib/surface/completion_queue_factory.c | 75 ++++++++++++++++++++++ src/core/lib/surface/completion_queue_factory.h | 51 +++++++++++++++ src/python/grpcio/grpc_core_dependencies.py | 1 + src/ruby/ext/grpc/rb_grpc_imports.generated.c | 6 ++ src/ruby/ext/grpc/rb_grpc_imports.generated.h | 9 +++ tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/generated/sources_and_headers.json | 3 + tools/run_tests/generated/tests.json | 1 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 ++ .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 3 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 6 ++ .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 ++ 24 files changed, 263 insertions(+) create mode 100644 src/core/lib/surface/completion_queue_factory.c create mode 100644 src/core/lib/surface/completion_queue_factory.h (limited to 'include') diff --git a/CMakeLists.txt b/CMakeLists.txt index 851aeb8401..950b8b62e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -965,6 +965,7 @@ add_library(grpc src/core/lib/surface/channel_ping.c src/core/lib/surface/channel_stack_type.c src/core/lib/surface/completion_queue.c + src/core/lib/surface/completion_queue_factory.c src/core/lib/surface/event_string.c src/core/lib/surface/lame_client.c src/core/lib/surface/metadata_array.c @@ -1277,6 +1278,7 @@ add_library(grpc_cronet src/core/lib/surface/channel_ping.c src/core/lib/surface/channel_stack_type.c src/core/lib/surface/completion_queue.c + src/core/lib/surface/completion_queue_factory.c src/core/lib/surface/event_string.c src/core/lib/surface/lame_client.c src/core/lib/surface/metadata_array.c @@ -1580,6 +1582,7 @@ add_library(grpc_test_util src/core/lib/surface/channel_ping.c src/core/lib/surface/channel_stack_type.c src/core/lib/surface/completion_queue.c + src/core/lib/surface/completion_queue_factory.c src/core/lib/surface/event_string.c src/core/lib/surface/lame_client.c src/core/lib/surface/metadata_array.c @@ -1829,6 +1832,7 @@ add_library(grpc_unsecure src/core/lib/surface/channel_ping.c src/core/lib/surface/channel_stack_type.c src/core/lib/surface/completion_queue.c + src/core/lib/surface/completion_queue_factory.c src/core/lib/surface/event_string.c src/core/lib/surface/lame_client.c src/core/lib/surface/metadata_array.c @@ -2439,6 +2443,7 @@ add_library(grpc++_cronet src/core/lib/surface/channel_ping.c src/core/lib/surface/channel_stack_type.c src/core/lib/surface/completion_queue.c + src/core/lib/surface/completion_queue_factory.c src/core/lib/surface/event_string.c src/core/lib/surface/lame_client.c src/core/lib/surface/metadata_array.c diff --git a/Makefile b/Makefile index d2104e973c..9a8a44b7d1 100644 --- a/Makefile +++ b/Makefile @@ -2857,6 +2857,7 @@ LIBGRPC_SRC = \ src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ @@ -3172,6 +3173,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ @@ -3478,6 +3480,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ @@ -3707,6 +3710,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ @@ -4319,6 +4323,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ diff --git a/binding.gyp b/binding.gyp index f79374a446..957c343e4f 100644 --- a/binding.gyp +++ b/binding.gyp @@ -711,6 +711,7 @@ 'src/core/lib/surface/channel_ping.c', 'src/core/lib/surface/channel_stack_type.c', 'src/core/lib/surface/completion_queue.c', + 'src/core/lib/surface/completion_queue_factory.c', 'src/core/lib/surface/event_string.c', 'src/core/lib/surface/lame_client.c', 'src/core/lib/surface/metadata_array.c', diff --git a/build.yaml b/build.yaml index 80c9849ca4..8f339bdd08 100644 --- a/build.yaml +++ b/build.yaml @@ -261,6 +261,7 @@ filegroups: - src/core/lib/surface/channel_init.h - src/core/lib/surface/channel_stack_type.h - src/core/lib/surface/completion_queue.h + - src/core/lib/surface/completion_queue_factory.h - src/core/lib/surface/event_string.h - src/core/lib/surface/init.h - src/core/lib/surface/lame_client.h @@ -383,6 +384,7 @@ filegroups: - src/core/lib/surface/channel_ping.c - src/core/lib/surface/channel_stack_type.c - src/core/lib/surface/completion_queue.c + - src/core/lib/surface/completion_queue_factory.c - src/core/lib/surface/event_string.c - src/core/lib/surface/lame_client.c - src/core/lib/surface/metadata_array.c diff --git a/config.m4 b/config.m4 index 3194b26669..0ca8c499f9 100644 --- a/config.m4 +++ b/config.m4 @@ -184,6 +184,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/surface/channel_ping.c \ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/completion_queue.c \ + src/core/lib/surface/completion_queue_factory.c \ src/core/lib/surface/event_string.c \ src/core/lib/surface/lame_client.c \ src/core/lib/surface/metadata_array.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2444ffa57a..b5ff7a51c7 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -342,6 +342,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/channel_init.h', 'src/core/lib/surface/channel_stack_type.h', 'src/core/lib/surface/completion_queue.h', + 'src/core/lib/surface/completion_queue_factory.h', 'src/core/lib/surface/event_string.h', 'src/core/lib/surface/init.h', 'src/core/lib/surface/lame_client.h', @@ -554,6 +555,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/channel_ping.c', 'src/core/lib/surface/channel_stack_type.c', 'src/core/lib/surface/completion_queue.c', + 'src/core/lib/surface/completion_queue_factory.c', 'src/core/lib/surface/event_string.c', 'src/core/lib/surface/lame_client.c', 'src/core/lib/surface/metadata_array.c', @@ -782,6 +784,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/channel_init.h', 'src/core/lib/surface/channel_stack_type.h', 'src/core/lib/surface/completion_queue.h', + 'src/core/lib/surface/completion_queue_factory.h', 'src/core/lib/surface/event_string.h', 'src/core/lib/surface/init.h', 'src/core/lib/surface/lame_client.h', diff --git a/grpc.def b/grpc.def index 30d60b0d06..1589316a58 100644 --- a/grpc.def +++ b/grpc.def @@ -53,6 +53,9 @@ EXPORTS grpc_shutdown grpc_version_string grpc_g_stands_for + grpc_completion_queue_factory_lookup + grpc_completion_queue_create_for_next + grpc_completion_queue_create_for_pluck grpc_completion_queue_create grpc_completion_queue_next grpc_completion_queue_pluck diff --git a/grpc.gemspec b/grpc.gemspec index 81e8733052..d85cc5ebac 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -259,6 +259,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/surface/channel_init.h ) s.files += %w( src/core/lib/surface/channel_stack_type.h ) s.files += %w( src/core/lib/surface/completion_queue.h ) + s.files += %w( src/core/lib/surface/completion_queue_factory.h ) s.files += %w( src/core/lib/surface/event_string.h ) s.files += %w( src/core/lib/surface/init.h ) s.files += %w( src/core/lib/surface/lame_client.h ) @@ -471,6 +472,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/surface/channel_ping.c ) s.files += %w( src/core/lib/surface/channel_stack_type.c ) s.files += %w( src/core/lib/surface/completion_queue.c ) + s.files += %w( src/core/lib/surface/completion_queue_factory.c ) s.files += %w( src/core/lib/surface/event_string.c ) s.files += %w( src/core/lib/surface/lame_client.c ) s.files += %w( src/core/lib/surface/metadata_array.c ) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 1b33d48c02..bcf64a6081 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -93,6 +93,70 @@ GRPCAPI const char *grpc_version_string(void); /** Return a string specifying what the 'g' in gRPC stands for */ GRPCAPI const char *grpc_g_stands_for(void); +/** Specifies the type of APIs to use to pop events from the completion queue */ +typedef enum { + /* Events are popped out by calling grpc_completion_queue_next() API ONLY */ + GRPC_CQ_NEXT = 0, + + /* Events are popped out by calling grpc_completion_queue_pluck() API ONLY */ + GRPC_CQ_PLUCK +} grpc_cq_completion_type; + +/** Completion queues internally MAY maintain a set of file descriptors in a + structure called 'pollset'. This enum specifies if a completion queue has an + associated pollset and any restrictions on the type of file descriptors that + can be present in the pollset. + + I/O progress can only be made when grpc_completion_queue_next() or + grpc_completion_queue_pluck() are called on the completion queue (unless the + grpc_cq_polling_type is GRPC_CQ_NON_POLLING) and hence it is very important + to actively call these APIs */ +typedef enum { + /** The completion queue will have an associated pollset and there is no + restriction on the type of file descriptors the pollset may contain */ + GRPC_CQ_DEFAULT_POLLING, + + /* Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will + not contain any 'listening file descriptors' (i.e file descriptors used to + listen to incoming channels) */ + GRPC_CQ_NON_LISTENING, + + /* The completion queue will not have an associated pollset. Note that + grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still be + called to pop events from the completion queue; it is not required to call + them actively to make I/O progress */ + GRPC_CQ_NON_POLLING +} grpc_cq_polling_type; + +typedef struct grpc_completion_queue_attributes { + /* The version number of this structure. More fields might be added to this + structure in future. */ + int version; /* Current version is 1 */ + + grpc_cq_completion_type cq_type; + + grpc_cq_polling_type cq_polling_type; +} grpc_completion_queue_attributes; + +/** The completion queue factory structure is opaque to the callers of grpc */ +typedef struct grpc_completion_queue_factory grpc_completion_queue_factory; + +/** Returns the completion queue factory based on the attributes. MAY return a + NULL if no factory can be found */ +GRPCAPI const grpc_completion_queue_factory * +grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes *attributes); + +/** Helper function to create a completion queue with grpc_cq_completion_type + of GRPC_CQ_NEXT and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */ +GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_next( + void *reserved); + +/** Helper function to create a completion queue with grpc_cq_completion_type + of GRPC_CQ_PLUCK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */ +GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_pluck( + void *reserved); + /** Create a completion queue */ GRPCAPI grpc_completion_queue *grpc_completion_queue_create(void *reserved); diff --git a/package.xml b/package.xml index c66706cfde..a405c1f043 100644 --- a/package.xml +++ b/package.xml @@ -268,6 +268,7 @@ + @@ -480,6 +481,7 @@ + diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c new file mode 100644 index 0000000000..879b456ed5 --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.c @@ -0,0 +1,75 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/surface/completion_queue_factory.h" +#include "src/core/lib/surface/completion_queue.h" + +#include + +/* TODO (sreek) - Currently this does not use the attributes arg. This will be + added in a future PR */ +static grpc_completion_queue* default_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes) { + return grpc_completion_queue_create(NULL); +} + +static grpc_completion_queue_factory_vtable default_vtable = {default_create}; + +static const grpc_completion_queue_factory g_default_cq_factory = { + "Default Factory", NULL, &default_vtable}; + +const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) { + /* As we add more fields to grpc_completion_queue_attributes, we may have to + change this assert */ + GPR_ASSERT(attributes->version == 1); + + /* The default factory can handle version 1 of the attributes structure. We + may have to change this as more fields are added to the structure */ + return &g_default_cq_factory; +} + +grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} + +grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, + GRPC_CQ_DEFAULT_POLLING}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} diff --git a/src/core/lib/surface/completion_queue_factory.h b/src/core/lib/surface/completion_queue_factory.h new file mode 100644 index 0000000000..57e90b5090 --- /dev/null +++ b/src/core/lib/surface/completion_queue_factory.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H +#define GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H + +#include +#include "src/core/lib/surface/completion_queue.h" + +typedef struct grpc_completion_queue_factory_vtable { + grpc_completion_queue* (*create)(const grpc_completion_queue_factory*, + const grpc_completion_queue_attributes*); +} grpc_completion_queue_factory_vtable; + +struct grpc_completion_queue_factory { + const char* name; + void* data; /* Factory specific data */ + grpc_completion_queue_factory_vtable* vtable; +}; + +#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_FACTORY_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5fc748483a..a9469046d4 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -178,6 +178,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/surface/channel_ping.c', 'src/core/lib/surface/channel_stack_type.c', 'src/core/lib/surface/completion_queue.c', + 'src/core/lib/surface/completion_queue_factory.c', 'src/core/lib/surface/event_string.c', 'src/core/lib/surface/lame_client.c', 'src/core/lib/surface/metadata_array.c', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 3ef6f0eb29..063f92114c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -91,6 +91,9 @@ grpc_init_type grpc_init_import; grpc_shutdown_type grpc_shutdown_import; grpc_version_string_type grpc_version_string_import; grpc_g_stands_for_type grpc_g_stands_for_import; +grpc_completion_queue_factory_lookup_type grpc_completion_queue_factory_lookup_import; +grpc_completion_queue_create_for_next_type grpc_completion_queue_create_for_next_import; +grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; grpc_completion_queue_create_type grpc_completion_queue_create_import; grpc_completion_queue_next_type grpc_completion_queue_next_import; grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; @@ -385,6 +388,9 @@ void grpc_rb_load_imports(HMODULE library) { grpc_shutdown_import = (grpc_shutdown_type) GetProcAddress(library, "grpc_shutdown"); grpc_version_string_import = (grpc_version_string_type) GetProcAddress(library, "grpc_version_string"); grpc_g_stands_for_import = (grpc_g_stands_for_type) GetProcAddress(library, "grpc_g_stands_for"); + grpc_completion_queue_factory_lookup_import = (grpc_completion_queue_factory_lookup_type) GetProcAddress(library, "grpc_completion_queue_factory_lookup"); + grpc_completion_queue_create_for_next_import = (grpc_completion_queue_create_for_next_type) GetProcAddress(library, "grpc_completion_queue_create_for_next"); + grpc_completion_queue_create_for_pluck_import = (grpc_completion_queue_create_for_pluck_type) GetProcAddress(library, "grpc_completion_queue_create_for_pluck"); grpc_completion_queue_create_import = (grpc_completion_queue_create_type) GetProcAddress(library, "grpc_completion_queue_create"); grpc_completion_queue_next_import = (grpc_completion_queue_next_type) GetProcAddress(library, "grpc_completion_queue_next"); grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index ef9845dfe0..f5dcd68a8e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -224,6 +224,15 @@ extern grpc_version_string_type grpc_version_string_import; typedef const char *(*grpc_g_stands_for_type)(void); extern grpc_g_stands_for_type grpc_g_stands_for_import; #define grpc_g_stands_for grpc_g_stands_for_import +typedef const grpc_completion_queue_factory *(*grpc_completion_queue_factory_lookup_type)(const grpc_completion_queue_attributes *attributes); +extern grpc_completion_queue_factory_lookup_type grpc_completion_queue_factory_lookup_import; +#define grpc_completion_queue_factory_lookup grpc_completion_queue_factory_lookup_import +typedef grpc_completion_queue *(*grpc_completion_queue_create_for_next_type)(void *reserved); +extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_for_next_import; +#define grpc_completion_queue_create_for_next grpc_completion_queue_create_for_next_import +typedef grpc_completion_queue *(*grpc_completion_queue_create_for_pluck_type)(void *reserved); +extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; +#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(void *reserved); extern grpc_completion_queue_create_type grpc_completion_queue_create_import; #define grpc_completion_queue_create grpc_completion_queue_create_import diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 8922363098..d9690a07ed 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1308,6 +1308,8 @@ src/core/lib/surface/channel_stack_type.c \ src/core/lib/surface/channel_stack_type.h \ src/core/lib/surface/completion_queue.c \ src/core/lib/surface/completion_queue.h \ +src/core/lib/surface/completion_queue_factory.c \ +src/core/lib/surface/completion_queue_factory.h \ src/core/lib/surface/event_string.c \ src/core/lib/surface/event_string.h \ src/core/lib/surface/init.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 9160b0d9d6..e6e9c7c933 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7547,6 +7547,7 @@ "src/core/lib/surface/channel_init.h", "src/core/lib/surface/channel_stack_type.h", "src/core/lib/surface/completion_queue.h", + "src/core/lib/surface/completion_queue_factory.h", "src/core/lib/surface/event_string.h", "src/core/lib/surface/init.h", "src/core/lib/surface/lame_client.h", @@ -7771,6 +7772,8 @@ "src/core/lib/surface/channel_stack_type.h", "src/core/lib/surface/completion_queue.c", "src/core/lib/surface/completion_queue.h", + "src/core/lib/surface/completion_queue_factory.c", + "src/core/lib/surface/completion_queue_factory.h", "src/core/lib/surface/event_string.c", "src/core/lib/surface/event_string.h", "src/core/lib/surface/init.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 1af05162b4..8c708f09ee 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -117505,6 +117505,7 @@ "language": "c", "name": "hpack_parser_fuzzer_test_one_entry", "platforms": [ + "mac", "linux" ], "uses_polling": false diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 5e3b027663..0123f3eb82 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -388,6 +388,7 @@ + @@ -705,6 +706,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index d75ca766c0..f54379f0ee 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -310,6 +310,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface @@ -1046,6 +1049,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 62969e31ac..70a6d1806a 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -283,6 +283,7 @@ + @@ -548,6 +549,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 30088101f5..b4de56e564 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -367,6 +367,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface @@ -830,6 +833,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 11ac8bd4b2..8c2dcebfa2 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -378,6 +378,7 @@ + @@ -672,6 +673,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 414e2a50b8..b8ad369a5f 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -313,6 +313,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface @@ -956,6 +959,9 @@ src\core\lib\surface + + src\core\lib\surface + src\core\lib\surface -- cgit v1.2.3 From add6962d7af2fd3943c2f2d739bd13b3ce4b91d8 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 21 Mar 2017 16:12:39 -0700 Subject: Add version macro --- include/grpc/grpc.h | 4 +++- src/core/lib/surface/completion_queue_factory.c | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index bcf64a6081..fa4fd6d49d 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -128,10 +128,12 @@ typedef enum { GRPC_CQ_NON_POLLING } grpc_cq_polling_type; + +#define GRPC_CQ_CURRENT_VERSION 1 typedef struct grpc_completion_queue_attributes { /* The version number of this structure. More fields might be added to this structure in future. */ - int version; /* Current version is 1 */ + int version; /* Set to GRPC_CQ_CURRENT_VERSION */ grpc_cq_completion_type cq_type; diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c index 879b456ed5..138dfeff48 100644 --- a/src/core/lib/surface/completion_queue_factory.c +++ b/src/core/lib/surface/completion_queue_factory.c @@ -52,7 +52,9 @@ static const grpc_completion_queue_factory g_default_cq_factory = { const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( const grpc_completion_queue_attributes* attributes) { /* As we add more fields to grpc_completion_queue_attributes, we may have to - change this assert */ + change this assert to: + GPR_ASSERT (attributes->version >= 1 && + attributes->version <= GRPC_CQ_CURRENT_VERSION) */ GPR_ASSERT(attributes->version == 1); /* The default factory can handle version 1 of the attributes structure. We -- cgit v1.2.3 From 48ed124138d5a6c58710fd6d89d2568733987959 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 21 Mar 2017 16:43:38 -0700 Subject: minor enum change --- include/grpc/grpc.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index fa4fd6d49d..68c277cd08 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -96,7 +96,7 @@ GRPCAPI const char *grpc_g_stands_for(void); /** Specifies the type of APIs to use to pop events from the completion queue */ typedef enum { /* Events are popped out by calling grpc_completion_queue_next() API ONLY */ - GRPC_CQ_NEXT = 0, + GRPC_CQ_NEXT = 1, /* Events are popped out by calling grpc_completion_queue_pluck() API ONLY */ GRPC_CQ_PLUCK -- cgit v1.2.3 From 995ed1c3c6abd81f6b9dad71491da986e0e6f01e Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 22 Mar 2017 02:34:39 -0700 Subject: Fix bug --- include/grpc/grpc.h | 2 +- src/core/lib/surface/completion_queue_factory.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'include') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 68c277cd08..fecb784570 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -135,7 +135,7 @@ typedef struct grpc_completion_queue_attributes { structure in future. */ int version; /* Set to GRPC_CQ_CURRENT_VERSION */ - grpc_cq_completion_type cq_type; + grpc_cq_completion_type cq_completion_type; grpc_cq_polling_type cq_polling_type; } grpc_completion_queue_attributes; diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c index 138dfeff48..db67a5192b 100644 --- a/src/core/lib/surface/completion_queue_factory.c +++ b/src/core/lib/surface/completion_queue_factory.c @@ -71,7 +71,7 @@ grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { GPR_ASSERT(!reserved); - grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, + grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING}; return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); } -- cgit v1.2.3 From 3a632c4a71da3c9a3ddfbefefdb160b53db1e0b8 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 22 Mar 2017 11:49:43 -0700 Subject: clang format fix --- include/grpc/grpc.h | 1 - 1 file changed, 1 deletion(-) (limited to 'include') diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index fecb784570..e088435d6c 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -128,7 +128,6 @@ typedef enum { GRPC_CQ_NON_POLLING } grpc_cq_polling_type; - #define GRPC_CQ_CURRENT_VERSION 1 typedef struct grpc_completion_queue_attributes { /* The version number of this structure. More fields might be added to this -- cgit v1.2.3 From c46b3ebeb925baf3d006f0a5c297dd4539e40797 Mon Sep 17 00:00:00 2001 From: Michael Warres Date: Wed, 22 Feb 2017 22:57:04 -0500 Subject: Add socket factory support to udp_server.c --- BUILD | 4 +- CMakeLists.txt | 5 + Makefile | 5 + binding.gyp | 1 + build.yaml | 2 + config.m4 | 1 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + include/grpc/impl/codegen/grpc_types.h | 5 + package.xml | 2 + src/core/lib/iomgr/socket_factory_posix.c | 110 +++++++++++++++++++++ src/core/lib/iomgr/socket_factory_posix.h | 90 +++++++++++++++++ src/core/lib/iomgr/socket_utils_common_posix.c | 18 +++- src/core/lib/iomgr/socket_utils_posix.h | 7 ++ src/core/lib/iomgr/udp_server.c | 49 +++++++-- src/core/lib/iomgr/udp_server.h | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/iomgr/udp_server_test.c | 89 ++++++++++++++++- tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/generated/sources_and_headers.json | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 ++ .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 3 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 6 ++ .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 ++ 26 files changed, 411 insertions(+), 17 deletions(-) create mode 100644 src/core/lib/iomgr/socket_factory_posix.c create mode 100644 src/core/lib/iomgr/socket_factory_posix.h (limited to 'include') diff --git a/BUILD b/BUILD index 053f581101..10d5c447ca 100644 --- a/BUILD +++ b/BUILD @@ -354,8 +354,8 @@ grpc_cc_library( "src/core/lib/support/wrap_memcpy.c", ], hdrs = [ - "src/core/lib/support/arena.h", "src/core/lib/profiling/timers.h", + "src/core/lib/support/arena.h", "src/core/lib/support/backoff.h", "src/core/lib/support/block_annotate.h", "src/core/lib/support/env.h", @@ -472,6 +472,7 @@ grpc_cc_library( "src/core/lib/iomgr/resolve_address_windows.c", "src/core/lib/iomgr/resource_quota.c", "src/core/lib/iomgr/sockaddr_utils.c", + "src/core/lib/iomgr/socket_factory_posix.c", "src/core/lib/iomgr/socket_mutator.c", "src/core/lib/iomgr/socket_utils_common_posix.c", "src/core/lib/iomgr/socket_utils_linux.c", @@ -597,6 +598,7 @@ grpc_cc_library( "src/core/lib/iomgr/sockaddr_posix.h", "src/core/lib/iomgr/sockaddr_utils.h", "src/core/lib/iomgr/sockaddr_windows.h", + "src/core/lib/iomgr/socket_factory_posix.h", "src/core/lib/iomgr/socket_mutator.h", "src/core/lib/iomgr/socket_utils.h", "src/core/lib/iomgr/socket_utils_posix.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a8bff5534..f73d76dbe6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -913,6 +913,7 @@ add_library(grpc src/core/lib/iomgr/resolve_address_windows.c src/core/lib/iomgr/resource_quota.c src/core/lib/iomgr/sockaddr_utils.c + src/core/lib/iomgr/socket_factory_posix.c src/core/lib/iomgr/socket_mutator.c src/core/lib/iomgr/socket_utils_common_posix.c src/core/lib/iomgr/socket_utils_linux.c @@ -1224,6 +1225,7 @@ add_library(grpc_cronet src/core/lib/iomgr/resolve_address_windows.c src/core/lib/iomgr/resource_quota.c src/core/lib/iomgr/sockaddr_utils.c + src/core/lib/iomgr/socket_factory_posix.c src/core/lib/iomgr/socket_mutator.c src/core/lib/iomgr/socket_utils_common_posix.c src/core/lib/iomgr/socket_utils_linux.c @@ -1526,6 +1528,7 @@ add_library(grpc_test_util src/core/lib/iomgr/resolve_address_windows.c src/core/lib/iomgr/resource_quota.c src/core/lib/iomgr/sockaddr_utils.c + src/core/lib/iomgr/socket_factory_posix.c src/core/lib/iomgr/socket_mutator.c src/core/lib/iomgr/socket_utils_common_posix.c src/core/lib/iomgr/socket_utils_linux.c @@ -1775,6 +1778,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/resolve_address_windows.c src/core/lib/iomgr/resource_quota.c src/core/lib/iomgr/sockaddr_utils.c + src/core/lib/iomgr/socket_factory_posix.c src/core/lib/iomgr/socket_mutator.c src/core/lib/iomgr/socket_utils_common_posix.c src/core/lib/iomgr/socket_utils_linux.c @@ -2384,6 +2388,7 @@ add_library(grpc++_cronet src/core/lib/iomgr/resolve_address_windows.c src/core/lib/iomgr/resource_quota.c src/core/lib/iomgr/sockaddr_utils.c + src/core/lib/iomgr/socket_factory_posix.c src/core/lib/iomgr/socket_mutator.c src/core/lib/iomgr/socket_utils_common_posix.c src/core/lib/iomgr/socket_utils_linux.c diff --git a/Makefile b/Makefile index a1659cc8e5..9d3aee8a71 100644 --- a/Makefile +++ b/Makefile @@ -2804,6 +2804,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ @@ -3118,6 +3119,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ @@ -3423,6 +3425,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ @@ -3652,6 +3655,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ @@ -4263,6 +4267,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ diff --git a/binding.gyp b/binding.gyp index 2ec57768ae..12ed7a8f3c 100644 --- a/binding.gyp +++ b/binding.gyp @@ -657,6 +657,7 @@ 'src/core/lib/iomgr/resolve_address_windows.c', 'src/core/lib/iomgr/resource_quota.c', 'src/core/lib/iomgr/sockaddr_utils.c', + 'src/core/lib/iomgr/socket_factory_posix.c', 'src/core/lib/iomgr/socket_mutator.c', 'src/core/lib/iomgr/socket_utils_common_posix.c', 'src/core/lib/iomgr/socket_utils_linux.c', diff --git a/build.yaml b/build.yaml index 72b15374c6..63f22fd2fd 100644 --- a/build.yaml +++ b/build.yaml @@ -223,6 +223,7 @@ filegroups: - src/core/lib/iomgr/sockaddr_posix.h - src/core/lib/iomgr/sockaddr_utils.h - src/core/lib/iomgr/sockaddr_windows.h + - src/core/lib/iomgr/socket_factory_posix.h - src/core/lib/iomgr/socket_mutator.h - src/core/lib/iomgr/socket_utils.h - src/core/lib/iomgr/socket_utils_posix.h @@ -329,6 +330,7 @@ filegroups: - src/core/lib/iomgr/resolve_address_windows.c - src/core/lib/iomgr/resource_quota.c - src/core/lib/iomgr/sockaddr_utils.c + - src/core/lib/iomgr/socket_factory_posix.c - src/core/lib/iomgr/socket_mutator.c - src/core/lib/iomgr/socket_utils_common_posix.c - src/core/lib/iomgr/socket_utils_linux.c diff --git a/config.m4 b/config.m4 index 2bf302b835..96ac5588c0 100644 --- a/config.m4 +++ b/config.m4 @@ -130,6 +130,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/resolve_address_windows.c \ src/core/lib/iomgr/resource_quota.c \ src/core/lib/iomgr/sockaddr_utils.c \ + src/core/lib/iomgr/socket_factory_posix.c \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_utils_common_posix.c \ src/core/lib/iomgr/socket_utils_linux.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 6d1a59766e..1d091d0f7f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -304,6 +304,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/sockaddr_posix.h', 'src/core/lib/iomgr/sockaddr_utils.h', 'src/core/lib/iomgr/sockaddr_windows.h', + 'src/core/lib/iomgr/socket_factory_posix.h', 'src/core/lib/iomgr/socket_mutator.h', 'src/core/lib/iomgr/socket_utils.h', 'src/core/lib/iomgr/socket_utils_posix.h', @@ -500,6 +501,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/resolve_address_windows.c', 'src/core/lib/iomgr/resource_quota.c', 'src/core/lib/iomgr/sockaddr_utils.c', + 'src/core/lib/iomgr/socket_factory_posix.c', 'src/core/lib/iomgr/socket_mutator.c', 'src/core/lib/iomgr/socket_utils_common_posix.c', 'src/core/lib/iomgr/socket_utils_linux.c', @@ -743,6 +745,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/sockaddr_posix.h', 'src/core/lib/iomgr/sockaddr_utils.h', 'src/core/lib/iomgr/sockaddr_windows.h', + 'src/core/lib/iomgr/socket_factory_posix.h', 'src/core/lib/iomgr/socket_mutator.h', 'src/core/lib/iomgr/socket_utils.h', 'src/core/lib/iomgr/socket_utils_posix.h', diff --git a/grpc.gemspec b/grpc.gemspec index 8074df266c..c7d9bfb433 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -221,6 +221,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/sockaddr_posix.h ) s.files += %w( src/core/lib/iomgr/sockaddr_utils.h ) s.files += %w( src/core/lib/iomgr/sockaddr_windows.h ) + s.files += %w( src/core/lib/iomgr/socket_factory_posix.h ) s.files += %w( src/core/lib/iomgr/socket_mutator.h ) s.files += %w( src/core/lib/iomgr/socket_utils.h ) s.files += %w( src/core/lib/iomgr/socket_utils_posix.h ) @@ -417,6 +418,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/resolve_address_windows.c ) s.files += %w( src/core/lib/iomgr/resource_quota.c ) s.files += %w( src/core/lib/iomgr/sockaddr_utils.c ) + s.files += %w( src/core/lib/iomgr/socket_factory_posix.c ) s.files += %w( src/core/lib/iomgr/socket_mutator.c ) s.files += %w( src/core/lib/iomgr/socket_utils_common_posix.c ) s.files += %w( src/core/lib/iomgr/socket_utils_linux.c ) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index e5c731304c..887c176f1a 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -87,6 +87,9 @@ typedef struct grpc_call grpc_call; /** The Socket Mutator interface allows changes on socket options */ typedef struct grpc_socket_mutator grpc_socket_mutator; +/** The Socket Factory interface creates and binds sockets */ +typedef struct grpc_socket_factory grpc_socket_factory; + /** Type specifier for grpc_arg */ typedef enum { GRPC_ARG_STRING, @@ -240,6 +243,8 @@ typedef struct { #define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name" /** The grpc_socket_mutator instance that set the socket options. A pointer. */ #define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator" +/** The grpc_socket_factory instance to create and bind sockets. A pointer. */ +#define GRPC_ARG_SOCKET_FACTORY "grpc.socket_factory" /** If non-zero, Cronet transport will coalesce packets to fewer frames when * possible. */ #define GRPC_ARG_USE_CRONET_PACKET_COALESCING \ diff --git a/package.xml b/package.xml index f096869e4e..a47f1b9761 100644 --- a/package.xml +++ b/package.xml @@ -230,6 +230,7 @@ + @@ -426,6 +427,7 @@ + diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c new file mode 100644 index 0000000000..1050a14c46 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.c @@ -0,0 +1,110 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET + +#include "src/core/lib/iomgr/socket_factory_posix.h" + +#include +#include +#include + +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable) { + factory->vtable = vtable; + gpr_ref_init(&factory->refcount, 1); +} + +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol) { + return factory->vtable->socket(factory, domain, type, protocol); +} + +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr) { + return factory->vtable->bind(factory, sockfd, addr); +} + +int grpc_socket_factory_compare(grpc_socket_factory *a, + grpc_socket_factory *b) { + int c = GPR_ICMP(a, b); + if (c != 0) { + grpc_socket_factory *sma = a; + grpc_socket_factory *smb = b; + c = GPR_ICMP(sma->vtable, smb->vtable); + if (c == 0) { + c = sma->vtable->compare(sma, smb); + } + } + return c; +} + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory) { + gpr_ref(&factory->refcount); + return factory; +} + +void grpc_socket_factory_unref(grpc_socket_factory *factory) { + if (gpr_unref(&factory->refcount)) { + factory->vtable->destroy(factory); + } +} + +static void *socket_factory_arg_copy(void *p) { + return grpc_socket_factory_ref(p); +} + +static void socket_factory_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { + grpc_socket_factory_unref(p); +} + +static int socket_factory_cmp(void *a, void *b) { + return grpc_socket_factory_compare((grpc_socket_factory *)a, + (grpc_socket_factory *)b); +} + +static const grpc_arg_pointer_vtable socket_factory_arg_vtable = { + socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp}; + +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SOCKET_FACTORY; + arg.value.pointer.vtable = &socket_factory_arg_vtable; + arg.value.pointer.p = factory; + return arg; +} + +#endif diff --git a/src/core/lib/iomgr/socket_factory_posix.h b/src/core/lib/iomgr/socket_factory_posix.h new file mode 100644 index 0000000000..2c63299030 --- /dev/null +++ b/src/core/lib/iomgr/socket_factory_posix.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H + +#include +#include +#include "src/core/lib/iomgr/resolve_address.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** The virtual table of grpc_socket_factory */ +typedef struct { + /** Replacement for socket(2) */ + int (*socket)(grpc_socket_factory *factory, int domain, int type, + int protocol); + /** Replacement for bind(2) */ + int (*bind)(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + /** Compare socket factory \a a and \a b */ + int (*compare)(grpc_socket_factory *a, grpc_socket_factory *b); + /** Destroys the socket factory instance */ + void (*destroy)(grpc_socket_factory *factory); +} grpc_socket_factory_vtable; + +/** The Socket Factory interface allows changes on socket options */ +struct grpc_socket_factory { + const grpc_socket_factory_vtable *vtable; + gpr_refcount refcount; +}; + +/** called by concrete implementations to initialize the base struct */ +void grpc_socket_factory_init(grpc_socket_factory *factory, + const grpc_socket_factory_vtable *vtable); + +/** Wrap \a factory as a grpc_arg */ +grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory); + +/** Perform the equivalent of a socket(2) operation using \a factory */ +int grpc_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol); + +/** Perform the equivalent of a bind(2) operation using \a factory */ +int grpc_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr); + +/** Compare if \a a and \a b are the same factory or have same settings */ +int grpc_socket_factory_compare(grpc_socket_factory *a, grpc_socket_factory *b); + +grpc_socket_factory *grpc_socket_factory_ref(grpc_socket_factory *factory); +void grpc_socket_factory_unref(grpc_socket_factory *factory); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_FACTORY_POSIX_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index b69c924d4a..bbe642d0fb 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -278,11 +278,25 @@ static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { grpc_error *grpc_create_dualstack_socket( const grpc_resolved_address *resolved_addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { + return grpc_create_dualstack_socket_using_factory(NULL, resolved_addr, type, + protocol, dsmode, newfd); +} + +static int create_socket(grpc_socket_factory *factory, int domain, int type, + int protocol) { + return (factory != NULL) + ? grpc_socket_factory_socket(factory, domain, type, protocol) + : socket(domain, type, protocol); +} + +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *resolved_addr, + int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; int family = addr->sa_family; if (family == AF_INET6) { if (grpc_ipv6_loopback_available()) { - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); } else { *newfd = -1; errno = EAFNOSUPPORT; @@ -304,7 +318,7 @@ grpc_error *grpc_create_dualstack_socket( family = AF_INET; } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - *newfd = socket(family, type, protocol); + *newfd = create_socket(factory, family, type, protocol); return error_for_fd(*newfd, resolved_addr); } diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index e84d3781a1..2c2fc95ff9 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -41,6 +41,7 @@ #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" /* a wrapper for accept or accept4 */ @@ -137,4 +138,10 @@ grpc_error *grpc_create_dualstack_socket(const grpc_resolved_address *addr, grpc_dualstack_mode *dsmode, int *newfd); +/* Same as grpc_create_dualstack_socket(), but use the given socket factory (if + non-null) to create the socket, rather than calling socket() directly. */ +grpc_error *grpc_create_dualstack_socket_using_factory( + grpc_socket_factory *factory, const grpc_resolved_address *addr, int type, + int protocol, grpc_dualstack_mode *dsmode, int *newfd); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 28f2bd9359..86ec8f667d 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -59,11 +59,13 @@ #include #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" @@ -89,6 +91,9 @@ struct grpc_udp_listener { struct grpc_udp_server { gpr_mu mu; + /* factory to use for creating and binding sockets, or NULL */ + grpc_socket_factory *socket_factory; + /* active port count: how many ports are actually still listening */ size_t active_ports; /* destroyed port count: how many ports are completely destroyed */ @@ -113,9 +118,24 @@ struct grpc_udp_server { void *user_data; }; -grpc_udp_server *grpc_udp_server_create(void) { +static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { + if (args) { + const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); + if (arg) { + GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + return arg->value.pointer.p; + } + } + return NULL; +} + +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); + s->socket_factory = get_socket_factory(args); + if (s->socket_factory) { + grpc_socket_factory_ref(s->socket_factory); + } s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; @@ -139,6 +159,10 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_free(sp); } + if (s->socket_factory) { + grpc_socket_factory_unref(s->socket_factory); + } + gpr_free(s); } @@ -215,8 +239,17 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } } +static int bind_socket(grpc_socket_factory *socket_factory, int sockfd, + const grpc_resolved_address *addr) { + return (socket_factory != NULL) + ? grpc_socket_factory_bind(socket_factory, sockfd, addr) + : bind(sockfd, (struct sockaddr *)addr->addr, + (socklen_t)addr->len); +} + /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const grpc_resolved_address *addr) { +static int prepare_socket(grpc_socket_factory *socket_factory, int fd, + const grpc_resolved_address *addr) { grpc_resolved_address sockname_temp; struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; /* Set send/receive socket buffers to 1 MB */ @@ -246,7 +279,7 @@ static int prepare_socket(int fd, const grpc_resolved_address *addr) { } GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) { + if (bind_socket(socket_factory, fd, addr) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -339,7 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, char *addr_str; char *name; - port = prepare_socket(fd, addr); + port = prepare_socket(s->socket_factory, fd, addr); if (port >= 0) { grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); @@ -417,8 +450,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, /* Try listening on IPv6 first. */ addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { @@ -433,8 +466,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, } // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, - &dsmode, &fd)); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 90842a47f0..9df3fe4d1f 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -58,7 +58,7 @@ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data); /* Create a server, initially not bound to any ports */ -grpc_udp_server *grpc_udp_server_create(void); +grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args); /* Start listening to bound ports. user_data is passed to callbacks. */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index cb6ca34dc6..249d63ddd0 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -124,6 +124,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/resolve_address_windows.c', 'src/core/lib/iomgr/resource_quota.c', 'src/core/lib/iomgr/sockaddr_utils.c', + 'src/core/lib/iomgr/socket_factory_posix.c', 'src/core/lib/iomgr/socket_mutator.c', 'src/core/lib/iomgr/socket_utils_common_posix.c', 'src/core/lib/iomgr/socket_utils_linux.c', diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 396ec959cd..12d8406323 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -48,9 +48,12 @@ #include #include #include +#include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/socket_factory_posix.h" #include "test/core/util/test_config.h" #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) @@ -94,16 +97,59 @@ static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, g_number_of_orphan_calls++; } +struct test_socket_factory { + grpc_socket_factory base; + int number_of_socket_calls; + int number_of_bind_calls; +}; +typedef struct test_socket_factory test_socket_factory; + +static int test_socket_factory_socket(grpc_socket_factory *factory, int domain, + int type, int protocol) { + test_socket_factory *f = (test_socket_factory *)factory; + f->number_of_socket_calls++; + return socket(domain, type, protocol); +} + +static int test_socket_factory_bind(grpc_socket_factory *factory, int sockfd, + const grpc_resolved_address *addr) { + test_socket_factory *f = (test_socket_factory *)factory; + f->number_of_bind_calls++; + return bind(sockfd, (struct sockaddr *)addr->addr, (socklen_t)addr->len); +} + +static int test_socket_factory_compare(grpc_socket_factory *a, + grpc_socket_factory *b) { + return GPR_ICMP(a, b); +} + +static void test_socket_factory_destroy(grpc_socket_factory *factory) { + test_socket_factory *f = (test_socket_factory *)factory; + gpr_free(f); +} + +static const grpc_socket_factory_vtable test_socket_factory_vtable = { + test_socket_factory_socket, test_socket_factory_bind, + test_socket_factory_compare, test_socket_factory_destroy}; + +static test_socket_factory *test_socket_factory_create(void) { + test_socket_factory *factory = gpr_malloc(sizeof(test_socket_factory)); + grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable); + factory->number_of_socket_calls = 0; + factory->number_of_bind_calls = 0; + return factory; +} + static void test_no_op(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_udp_server *s = grpc_udp_server_create(); + grpc_udp_server *s = grpc_udp_server_create(NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_udp_server *s = grpc_udp_server_create(); + grpc_udp_server *s = grpc_udp_server_create(NULL); LOG_TEST("test_no_op_with_start"); grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL); @@ -115,7 +161,7 @@ static void test_no_op_with_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; - grpc_udp_server *s = grpc_udp_server_create(); + grpc_udp_server *s = grpc_udp_server_create(NULL); LOG_TEST("test_no_op_with_port"); memset(&resolved_addr, 0, sizeof(resolved_addr)); @@ -131,12 +177,44 @@ static void test_no_op_with_port(void) { GPR_ASSERT(g_number_of_orphan_calls == 1); } +static void test_no_op_with_port_and_socket_factory(void) { + g_number_of_orphan_calls = 0; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_address resolved_addr; + struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; + + test_socket_factory *socket_factory = test_socket_factory_create(); + grpc_arg socket_factory_arg = + grpc_socket_factory_to_arg(&socket_factory->base); + grpc_channel_args *channel_args = + grpc_channel_args_copy_and_add(NULL, &socket_factory_arg, 1); + grpc_udp_server *s = grpc_udp_server_create(channel_args); + grpc_channel_args_destroy(&exec_ctx, channel_args); + + LOG_TEST("test_no_op_with_port_and_socket_factory"); + + memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(struct sockaddr_in); + addr->sin_family = AF_INET; + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, + on_fd_orphaned)); + GPR_ASSERT(socket_factory->number_of_socket_calls == 1); + GPR_ASSERT(socket_factory->number_of_bind_calls == 1); + + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); + grpc_socket_factory_unref(&socket_factory->base); + + /* The server had a single FD, which should have been orphaned. */ + GPR_ASSERT(g_number_of_orphan_calls == 1); +} + static void test_no_op_with_port_and_start(void) { g_number_of_orphan_calls = 0; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; - grpc_udp_server *s = grpc_udp_server_create(); + grpc_udp_server *s = grpc_udp_server_create(NULL); LOG_TEST("test_no_op_with_port_and_start"); memset(&resolved_addr, 0, sizeof(resolved_addr)); @@ -160,7 +238,7 @@ static void test_receive(int number_of_clients) { grpc_resolved_address resolved_addr; struct sockaddr_storage *addr = (struct sockaddr_storage *)resolved_addr.addr; int clifd, svrfd; - grpc_udp_server *s = grpc_udp_server_create(); + grpc_udp_server *s = grpc_udp_server_create(NULL); int i; int number_of_reads_before; gpr_timespec deadline; @@ -243,6 +321,7 @@ int main(int argc, char **argv) { test_no_op(); test_no_op_with_start(); test_no_op_with_port(); + test_no_op_with_port_and_socket_factory(); test_no_op_with_port_and_start(); test_receive(1); test_receive(10); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 351638389e..5eb74ea77d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1111,6 +1111,8 @@ src/core/lib/iomgr/sockaddr_posix.h \ src/core/lib/iomgr/sockaddr_utils.c \ src/core/lib/iomgr/sockaddr_utils.h \ src/core/lib/iomgr/sockaddr_windows.h \ +src/core/lib/iomgr/socket_factory_posix.c \ +src/core/lib/iomgr/socket_factory_posix.h \ src/core/lib/iomgr/socket_mutator.c \ src/core/lib/iomgr/socket_mutator.h \ src/core/lib/iomgr/socket_utils.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 14ad133863..81dbd02008 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7512,6 +7512,7 @@ "src/core/lib/iomgr/sockaddr_posix.h", "src/core/lib/iomgr/sockaddr_utils.h", "src/core/lib/iomgr/sockaddr_windows.h", + "src/core/lib/iomgr/socket_factory_posix.h", "src/core/lib/iomgr/socket_mutator.h", "src/core/lib/iomgr/socket_utils.h", "src/core/lib/iomgr/socket_utils_posix.h", @@ -7681,6 +7682,8 @@ "src/core/lib/iomgr/sockaddr_utils.c", "src/core/lib/iomgr/sockaddr_utils.h", "src/core/lib/iomgr/sockaddr_windows.h", + "src/core/lib/iomgr/socket_factory_posix.c", + "src/core/lib/iomgr/socket_factory_posix.h", "src/core/lib/iomgr/socket_mutator.c", "src/core/lib/iomgr/socket_mutator.h", "src/core/lib/iomgr/socket_utils.h", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index afe72746e9..ccf0d4d87d 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -349,6 +349,7 @@ + @@ -595,6 +596,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index a7d32afbe1..e9d950a962 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -145,6 +145,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -926,6 +929,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 62969e31ac..734ce1b025 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -244,6 +244,7 @@ + @@ -438,6 +439,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 30088101f5..63eb9a0305 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -202,6 +202,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -713,6 +716,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 46069134a1..a077f0ec3c 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -339,6 +339,7 @@ + @@ -562,6 +563,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index d658f4d574..e8b381cdc0 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -148,6 +148,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -836,6 +839,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3