aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-04-07 13:44:34 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-04-07 13:44:34 -0700
commit7b9f97bfb88359d070f66c7f2cb132622046af5b (patch)
tree41fe278c1cb28a58bf417b3f645626118bd9cc2a
parente1bfff0bbedd5c0b973e5a08839a957183e006de (diff)
parentdd550c7316069973c8729bfeca23d4f9a60fd2ca (diff)
Merge branch 'master' into cq_create_api_changes
-rw-r--r--BUILD16
-rw-r--r--CMakeLists.txt46
-rw-r--r--Makefile49
-rw-r--r--build.yaml21
-rw-r--r--cmake/msvc_static_runtime.cmake2
-rw-r--r--doc/PROTOCOL-WEB.md3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--include/grpc/impl/codegen/grpc_types.h8
-rw-r--r--package.json2
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c3
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c7
-rw-r--r--src/core/lib/iomgr/endpoint_pair.h5
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.c17
-rw-r--r--src/core/lib/iomgr/endpoint_pair_uv.c5
-rw-r--r--src/core/lib/iomgr/endpoint_pair_windows.c15
-rw-r--r--src/core/lib/iomgr/error.c35
-rw-r--r--src/core/lib/iomgr/ev_posix.c6
-rw-r--r--src/core/lib/iomgr/ev_posix.h3
-rw-r--r--src/core/lib/iomgr/resource_quota.c9
-rw-r--r--src/core/lib/iomgr/resource_quota.h2
-rw-r--r--src/core/lib/iomgr/tcp_client.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c24
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c21
-rw-r--r--src/core/lib/iomgr/tcp_posix.c114
-rw-r--r--src/core/lib/iomgr/tcp_posix.h7
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c22
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c25
-rw-r--r--src/core/lib/iomgr/tcp_windows.c14
-rw-r--r--src/core/lib/iomgr/tcp_windows.h4
-rw-r--r--src/core/lib/slice/slice_buffer.c30
-rw-r--r--src/core/lib/surface/call.c135
-rw-r--r--src/node/index.js115
-rw-r--r--src/node/interop/interop_server.js2
-rw-r--r--src/node/performance/benchmark_server.js2
-rw-r--r--src/node/performance/worker.js4
-rw-r--r--src/node/src/client.js22
-rw-r--r--src/node/src/common.js117
-rw-r--r--src/node/src/protobuf_js_5_common.js181
-rw-r--r--src/node/src/protobuf_js_6_common.js170
-rw-r--r--src/node/src/server.js22
-rw-r--r--src/node/stress/metrics_server.js2
-rw-r--r--src/node/test/common_test.js107
-rw-r--r--src/node/test/credentials_test.js2
-rw-r--r--src/node/test/surface_test.js115
-rw-r--r--src/node/test/test_messages.proto17
-rw-r--r--src/python/grpcio/grpc/_server.py4
-rwxr-xr-xsrc/ruby/end2end/channel_closing_driver.rb2
-rwxr-xr-xsrc/ruby/end2end/channel_state_driver.rb2
-rwxr-xr-xsrc/ruby/end2end/end2end_common.rb5
-rwxr-xr-xsrc/ruby/end2end/killed_client_thread_client.rb58
-rwxr-xr-xsrc/ruby/end2end/killed_client_thread_driver.rb114
-rwxr-xr-xsrc/ruby/end2end/sig_handling_driver.rb2
-rwxr-xr-xsrc/ruby/end2end/sig_int_during_channel_watch_driver.rb2
-rw-r--r--src/ruby/ext/grpc/rb_call.c18
-rw-r--r--templates/grpc.gemspec.template2
-rw-r--r--templates/package.json.template2
-rw-r--r--test/core/bad_client/bad_client.c5
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.c4
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.c4
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.c14
-rw-r--r--test/core/end2end/fuzzers/server_fuzzer_corpus/clusterfuzz-testcase-6312731374256128bin0 -> 26793 bytes
-rw-r--r--test/core/iomgr/endpoint_pair_test.c10
-rw-r--r--test/core/iomgr/error_test.c30
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.c2
-rw-r--r--test/core/iomgr/tcp_posix_test.c55
-rw-r--r--test/core/security/secure_endpoint_test.c10
-rw-r--r--test/cpp/microbenchmarks/BUILD23
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc142
-rw-r--r--test/cpp/microbenchmarks/fullstack_fixtures.h4
-rw-r--r--test/distrib/csharp/run_distrib_test.bat2
-rwxr-xr-xtest/distrib/csharp/run_distrib_test.sh2
-rwxr-xr-xtest/distrib/csharp/run_distrib_test_dotnetcli.sh2
-rw-r--r--tools/run_tests/generated/sources_and_headers.json21
-rw-r--r--tools/run_tests/generated/tests.json45
-rw-r--r--tools/run_tests/helper_scripts/pre_build_csharp.bat2
-rwxr-xr-xtools/run_tests/helper_scripts/run_ruby_end2end_tests.sh1
77 files changed, 1606 insertions, 520 deletions
diff --git a/BUILD b/BUILD
index c4632fc0a2..1991c86743 100644
--- a/BUILD
+++ b/BUILD
@@ -41,7 +41,7 @@ g_stands_for = "green"
core_version = "3.0.0-dev"
-version = "1.2.0"
+version = "1.3.0-dev"
grpc_cc_library(
name = "gpr",
@@ -67,6 +67,7 @@ grpc_cc_library(
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
"grpc_load_reporting",
+ "grpc_max_age_filter",
"grpc_resolver_dns_ares",
"grpc_resolver_dns_native",
"grpc_resolver_sockaddr",
@@ -75,7 +76,6 @@ grpc_cc_library(
"grpc_transport_chttp2_client_secure",
"grpc_transport_chttp2_server_insecure",
"grpc_transport_chttp2_server_secure",
- "grpc_max_age_filter",
],
)
@@ -109,11 +109,11 @@ grpc_cc_library(
"grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin",
"grpc_load_reporting",
+ "grpc_max_age_filter",
"grpc_resolver_dns_native",
"grpc_resolver_sockaddr",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_server_insecure",
- "grpc_max_age_filter",
],
)
@@ -177,8 +177,6 @@ grpc_cc_library(
],
hdrs = [
"src/compiler/config.h",
- "src/compiler/schema_interface.h",
- "src/compiler/protobuf_plugin.h",
"src/compiler/cpp_generator.h",
"src/compiler/cpp_generator_helpers.h",
"src/compiler/csharp_generator.h",
@@ -190,6 +188,7 @@ grpc_cc_library(
"src/compiler/objective_c_generator_helpers.h",
"src/compiler/php_generator.h",
"src/compiler/php_generator_helpers.h",
+ "src/compiler/protobuf_plugin.h",
"src/compiler/python_generator.h",
"src/compiler/python_generator_helpers.h",
"src/compiler/python_private_generator.h",
@@ -197,6 +196,7 @@ grpc_cc_library(
"src/compiler/ruby_generator_helpers-inl.h",
"src/compiler/ruby_generator_map-inl.h",
"src/compiler/ruby_generator_string-inl.h",
+ "src/compiler/schema_interface.h",
],
external_deps = [
"protobuf_clib",
@@ -884,14 +884,14 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h",
],
+ external_deps = [
+ "cares",
+ ],
language = "c",
deps = [
"grpc_base",
"grpc_client_channel",
],
- external_deps = [
- "cares",
- ],
)
grpc_cc_library(
diff --git a/CMakeLists.txt b/CMakeLists.txt
index acf47e5bfe..96ece1fd60 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -626,6 +626,9 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_cq)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+add_dependencies(buildtests_cxx bm_cq_multiple_threads)
+endif()
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_error)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -8979,6 +8982,49 @@ endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+add_executable(bm_cq_multiple_threads
+ test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+ third_party/googletest/src/gtest-all.cc
+)
+
+
+target_include_directories(bm_cq_multiple_threads
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+ PRIVATE third_party/googletest/include
+ PRIVATE third_party/googletest
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(bm_cq_multiple_threads
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_benchmark
+ benchmark
+ grpc++_test_util
+ grpc_test_util
+ grpc++
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif()
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+
add_executable(bm_error
test/cpp/microbenchmarks/bm_error.cc
third_party/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 13f4fcec23..3fbe587ad8 100644
--- a/Makefile
+++ b/Makefile
@@ -1105,6 +1105,7 @@ bm_chttp2_hpack: $(BINDIR)/$(CONFIG)/bm_chttp2_hpack
bm_chttp2_transport: $(BINDIR)/$(CONFIG)/bm_chttp2_transport
bm_closure: $(BINDIR)/$(CONFIG)/bm_closure
bm_cq: $(BINDIR)/$(CONFIG)/bm_cq
+bm_cq_multiple_threads: $(BINDIR)/$(CONFIG)/bm_cq_multiple_threads
bm_error: $(BINDIR)/$(CONFIG)/bm_error
bm_fullstack_streaming_ping_pong: $(BINDIR)/$(CONFIG)/bm_fullstack_streaming_ping_pong
bm_fullstack_streaming_pump: $(BINDIR)/$(CONFIG)/bm_fullstack_streaming_pump
@@ -1530,6 +1531,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_chttp2_transport \
$(BINDIR)/$(CONFIG)/bm_closure \
$(BINDIR)/$(CONFIG)/bm_cq \
+ $(BINDIR)/$(CONFIG)/bm_cq_multiple_threads \
$(BINDIR)/$(CONFIG)/bm_error \
$(BINDIR)/$(CONFIG)/bm_fullstack_streaming_ping_pong \
$(BINDIR)/$(CONFIG)/bm_fullstack_streaming_pump \
@@ -1648,6 +1650,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_chttp2_transport \
$(BINDIR)/$(CONFIG)/bm_closure \
$(BINDIR)/$(CONFIG)/bm_cq \
+ $(BINDIR)/$(CONFIG)/bm_cq_multiple_threads \
$(BINDIR)/$(CONFIG)/bm_error \
$(BINDIR)/$(CONFIG)/bm_fullstack_streaming_ping_pong \
$(BINDIR)/$(CONFIG)/bm_fullstack_streaming_pump \
@@ -1997,6 +2000,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/bm_closure || ( echo test bm_closure failed ; exit 1 )
$(E) "[RUN] Testing bm_cq"
$(Q) $(BINDIR)/$(CONFIG)/bm_cq || ( echo test bm_cq failed ; exit 1 )
+ $(E) "[RUN] Testing bm_cq_multiple_threads"
+ $(Q) $(BINDIR)/$(CONFIG)/bm_cq_multiple_threads || ( echo test bm_cq_multiple_threads failed ; exit 1 )
$(E) "[RUN] Testing bm_error"
$(Q) $(BINDIR)/$(CONFIG)/bm_error || ( echo test bm_error failed ; exit 1 )
$(E) "[RUN] Testing bm_fullstack_streaming_ping_pong"
@@ -13360,6 +13365,50 @@ endif
endif
+BM_CQ_MULTIPLE_THREADS_SRC = \
+ test/cpp/microbenchmarks/bm_cq_multiple_threads.cc \
+
+BM_CQ_MULTIPLE_THREADS_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_CQ_MULTIPLE_THREADS_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/bm_cq_multiple_threads: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/bm_cq_multiple_threads: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/bm_cq_multiple_threads: $(PROTOBUF_DEP) $(BM_CQ_MULTIPLE_THREADS_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(BM_CQ_MULTIPLE_THREADS_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_cq_multiple_threads
+
+endif
+
+endif
+
+$(BM_CQ_MULTIPLE_THREADS_OBJS): CPPFLAGS += -Ithird_party/benchmark/include -DHAVE_POSIX_REGEX
+$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_cq_multiple_threads.o: $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_bm_cq_multiple_threads: $(BM_CQ_MULTIPLE_THREADS_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(BM_CQ_MULTIPLE_THREADS_OBJS:.o=.dep)
+endif
+endif
+
+
BM_ERROR_SRC = \
test/cpp/microbenchmarks/bm_error.cc \
diff --git a/build.yaml b/build.yaml
index df3da9553a..58816a3423 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3217,6 +3217,27 @@ targets:
- mac
- linux
- posix
+- name: bm_cq_multiple_threads
+ build: test
+ language: c++
+ src:
+ - test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+ deps:
+ - grpc_benchmark
+ - benchmark
+ - grpc++_test_util
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr_test_util
+ - gpr
+ args:
+ - --benchmark_min_time=0
+ defaults: benchmark
+ platforms:
+ - mac
+ - linux
+ - posix
- name: bm_error
build: test
language: c++
diff --git a/cmake/msvc_static_runtime.cmake b/cmake/msvc_static_runtime.cmake
index 5a31ab3d24..fc6d1d62d3 100644
--- a/cmake/msvc_static_runtime.cmake
+++ b/cmake/msvc_static_runtime.cmake
@@ -3,6 +3,8 @@ option(gRPC_MSVC_STATIC_RUNTIME "Link with static msvc runtime libraries" OFF)
if(gRPC_MSVC_STATIC_RUNTIME)
# switch from dynamic to static linking of msvcrt
foreach(flag_var
+ CMAKE_C_FLAGS CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_RELEASE
+ CMAKE_C_FLAGS_MINSIZEREL CMAKE_C_FLAGS_RELWITHDEBINFO
CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE
CMAKE_CXX_FLAGS_MINSIZEREL CMAKE_CXX_FLAGS_RELWITHDEBINFO)
diff --git a/doc/PROTOCOL-WEB.md b/doc/PROTOCOL-WEB.md
index 5f01af3627..6bb280894a 100644
--- a/doc/PROTOCOL-WEB.md
+++ b/doc/PROTOCOL-WEB.md
@@ -83,7 +83,8 @@ in the body.
User Agent
-* U-A: grpc-web-javascript
+* Do NOT use User-Agent header (which is to be set by browsers, by default)
+* Use X-User-Agent: grpc-web-javascript/0.1 (follow the same format as specified in [gRPC over HTTP2](http://www.grpc.io/docs/guides/wire.html))
---
diff --git a/grpc.gemspec b/grpc.gemspec
index a3a5870761..6a204685a0 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -24,7 +24,7 @@ Gem::Specification.new do |s|
s.files += Dir.glob('include/grpc/**/*')
s.test_files = Dir.glob('src/ruby/spec/**/*')
s.bindir = 'src/ruby/bin'
- s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb )
+ s.require_paths = %w( src/ruby/lib src/ruby/bin src/ruby/pb )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.1'
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index f11bbc4968..4383691a83 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -271,6 +271,14 @@ typedef struct {
* possible. */
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \
"grpc.use_cronet_packet_coalescing"
+/* Channel arg (integer) setting how large a slice to try and read from the wire
+each time recvmsg (or equivalent) is called */
+#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
+#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
+#define GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE \
+ "grpc.experimental.tcp_min_read_chunk_size"
+#define GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE \
+ "grpc.experimental.tcp_max_read_chunk_size"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a
diff --git a/package.json b/package.json
index 666e819fb5..7f242326d7 100644
--- a/package.json
+++ b/package.json
@@ -34,7 +34,7 @@
"lodash": "^4.15.0",
"nan": "^2.0.0",
"node-pre-gyp": "^0.6.0",
- "protobufjs": "^5.0.0",
+ "protobufjs": "^6.7.0",
"cares": "^1.1.5"
},
"devDependencies": {
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index f858220c01..a045f0a421 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/channel/message_size_filter.h"
+#include "src/core/ext/filters/max_age/max_age_filter.h"
#include <limits.h>
#include <string.h>
@@ -41,7 +41,6 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/http2_errors.h"
-#include "src/core/lib/transport/service_config.h"
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index f46e849932..6ab176e8ad 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -57,12 +57,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
char *name;
gpr_asprintf(&name, "fd:%d", fd);
- grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args(
- grpc_server_get_channel_args(server));
grpc_endpoint *server_endpoint =
- grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_tcp_create(&exec_ctx, grpc_fd_create(fd, name),
+ grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h
index f9de0c715e..6407a6ad3f 100644
--- a/src/core/lib/iomgr/endpoint_pair.h
+++ b/src/core/lib/iomgr/endpoint_pair.h
@@ -41,8 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_resource_quota *resource_quota,
- size_t read_slice_size);
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ grpc_channel_args *args);
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c
index b9ff969e81..5542a372d8 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.c
+++ b/src/core/lib/iomgr/endpoint_pair_posix.c
@@ -62,22 +62,25 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE);
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_resource_quota *resource_quota,
- size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ grpc_channel_args *args) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota,
- read_slice_size, "socketpair-server");
+ p.client = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], final_name), args,
+ "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota,
- read_slice_size, "socketpair-client");
+ p.server = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[0], final_name), args,
+ "socketpair-client");
gpr_free(final_name);
+
+ grpc_exec_ctx_finish(&exec_ctx);
return p;
}
diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c
index ff24894c6d..9718eb0523 100644
--- a/src/core/lib/iomgr/endpoint_pair_uv.c
+++ b/src/core/lib/iomgr/endpoint_pair_uv.c
@@ -41,9 +41,8 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_resource_quota *resource_quota,
- size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ grpc_channel_args *args) {
grpc_endpoint_pair endpoint_pair;
// TODO(mlumish): implement this properly under libuv
GPR_ASSERT(false &&
diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c
index 93f71b745c..25d6264dfb 100644
--- a/src/core/lib/iomgr/endpoint_pair_windows.c
+++ b/src/core/lib/iomgr/endpoint_pair_windows.c
@@ -83,15 +83,18 @@ static void create_sockets(SOCKET sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_resource_quota *resource_quota,
- size_t read_slice_size) {
+ const char *name, grpc_channel_args *channel_args) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
- p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
- resource_quota, "endpoint:server");
- p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
- resource_quota, "endpoint:client");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ p.client = grpc_tcp_create(&exec_ctx,
+ grpc_winsocket_create(sv[1], "endpoint:client"),
+ channel_args, "endpoint:server");
+ p.server = grpc_tcp_create(&exec_ctx,
+ grpc_winsocket_create(sv[0], "endpoint:server"),
+ channel_args, "endpoint:client");
+ grpc_exec_ctx_finish(&exec_ctx);
return p;
}
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 1dbb64e8f3..fbbca6b493 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -212,7 +212,11 @@ static uint8_t get_placement(grpc_error **err, size_t size) {
GPR_ASSERT(*err);
uint8_t slots = (uint8_t)(size / sizeof(intptr_t));
if ((*err)->arena_size + slots > (*err)->arena_capacity) {
- (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2);
+ (*err)->arena_capacity =
+ (uint8_t)GPR_MIN(UINT8_MAX - 1, (3 * (*err)->arena_capacity / 2));
+ if ((*err)->arena_size + slots > (*err)->arena_capacity) {
+ return UINT8_MAX;
+ }
*err = gpr_realloc(
*err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t));
}
@@ -223,10 +227,14 @@ static uint8_t get_placement(grpc_error **err, size_t size) {
static void internal_set_int(grpc_error **err, grpc_error_ints which,
intptr_t value) {
- // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this
uint8_t slot = (*err)->ints[which];
if (slot == UINT8_MAX) {
slot = get_placement(err, sizeof(value));
+ if (slot == UINT8_MAX) {
+ gpr_log(GPR_ERROR, "Error %p is full, dropping int {\"%s\":%" PRIiPTR "}",
+ *err, error_int_name(which), value);
+ return;
+ }
}
(*err)->ints[which] = slot;
(*err)->arena[slot] = value;
@@ -234,10 +242,16 @@ static void internal_set_int(grpc_error **err, grpc_error_ints which,
static void internal_set_str(grpc_error **err, grpc_error_strs which,
grpc_slice value) {
- // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this
uint8_t slot = (*err)->strs[which];
if (slot == UINT8_MAX) {
slot = get_placement(err, sizeof(value));
+ if (slot == UINT8_MAX) {
+ const char *str = grpc_slice_to_c_string(value);
+ gpr_log(GPR_ERROR, "Error %p is full, dropping string {\"%s\":\"%s\"}",
+ *err, error_str_name(which), str);
+ gpr_free((void *)str);
+ return;
+ }
} else {
unref_slice(*(grpc_slice *)((*err)->arena + slot));
}
@@ -245,12 +259,19 @@ static void internal_set_str(grpc_error **err, grpc_error_strs which,
memcpy((*err)->arena + slot, &value, sizeof(value));
}
+static char *fmt_time(gpr_timespec tm);
static void internal_set_time(grpc_error **err, grpc_error_times which,
gpr_timespec value) {
- // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this
uint8_t slot = (*err)->times[which];
if (slot == UINT8_MAX) {
slot = get_placement(err, sizeof(value));
+ if (slot == UINT8_MAX) {
+ const char *time_str = fmt_time(value);
+ gpr_log(GPR_ERROR, "Error %p is full, dropping \"%s\":\"%s\"}", *err,
+ error_time_name(which), time_str);
+ gpr_free((void *)time_str);
+ return;
+ }
}
(*err)->times[which] = slot;
memcpy((*err)->arena + slot, &value, sizeof(value));
@@ -259,6 +280,12 @@ static void internal_set_time(grpc_error **err, grpc_error_times which,
static void internal_add_error(grpc_error **err, grpc_error *new) {
grpc_linked_error new_last = {new, UINT8_MAX};
uint8_t slot = get_placement(err, sizeof(grpc_linked_error));
+ if (slot == UINT8_MAX) {
+ gpr_log(GPR_ERROR, "Error %p is full, dropping error %p = %s", *err, new,
+ grpc_error_string(new));
+ GRPC_ERROR_UNREF(new);
+ return;
+ }
if ((*err)->first_err == UINT8_MAX) {
GPR_ASSERT((*err)->last_err == UINT8_MAX);
(*err)->last_err = slot;
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index b5be5504b9..13409a4de8 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -111,6 +111,12 @@ static void try_engine(const char *engine) {
}
}
+/* This should be used for testing purposes ONLY */
+void grpc_set_event_engine_test_only(
+ const grpc_event_engine_vtable *ev_engine) {
+ g_event_engine = ev_engine;
+}
+
/* Call this only after calling grpc_event_engine_init() */
const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1a9e5c115a..becc4d359e 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -183,4 +183,7 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
+/* This should be used for testing purposes ONLY */
+void grpc_set_event_engine_test_only(const grpc_event_engine_vtable *);
+
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 8dcd80d001..c3ee878651 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -142,6 +142,8 @@ struct grpc_resource_quota {
/* Amount of free memory in the resource quota */
int64_t free_pool;
+ gpr_atm last_size;
+
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
/* Are we currently reclaiming memory */
@@ -581,6 +583,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
resource_quota->combiner = grpc_combiner_create(NULL);
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
+ gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
@@ -643,11 +646,17 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
rq_resize_args *a = gpr_malloc(sizeof(*a));
a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
+ gpr_atm_no_barrier_store(&resource_quota->last_size,
+ (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
+size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota) {
+ return (size_t)gpr_atm_no_barrier_load(&resource_quota->last_size);
+}
+
/*******************************************************************************
* grpc_resource_user channel args api
*/
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index b9f62cbf83..6f99be0d51 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -90,6 +90,8 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
double grpc_resource_quota_get_memory_pressure(
grpc_resource_quota *resource_quota);
+size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota);
+
typedef struct grpc_resource_user grpc_resource_user;
grpc_resource_user *grpc_resource_user_create(
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index 0485661316..bc367bdfa5 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -40,10 +40,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
-/* Channel arg (integer) setting how large a slice to try and read from the wire
- each time recvmsg (or equivalent) is called */
-#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
-
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index a108b10da6..a2692707d9 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -137,29 +137,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
const char *addr_str) {
- size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
- grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
- if (channel_args != NULL) {
- for (size_t i = 0; i < channel_args->num_args; i++) {
- if (0 ==
- strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
- grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
- 8 * 1024 * 1024};
- tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
- &channel_args->args[i], options);
- } else if (0 ==
- strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_ref_internal(
- channel_args->args[i].value.pointer.p);
- }
- }
- }
-
- grpc_endpoint *ep =
- grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
- return ep;
+ return grpc_tcp_create(exec_ctx, fd, channel_args, addr_str);
}
static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index a356564766..d6baca50ba 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -63,7 +63,7 @@ typedef struct {
int refs;
grpc_closure on_connect;
grpc_endpoint **endpoint;
- grpc_resource_quota *resource_quota;
+ grpc_channel_args *channel_args;
} async_connect;
static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
@@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota);
+ grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@@ -119,7 +119,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else {
- *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
+ *ep =
+ grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name);
socket = NULL;
}
} else {
@@ -152,17 +153,6 @@ static void tcp_client_connect_impl(
grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE;
- grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
- if (channel_args != NULL) {
- for (size_t i = 0; i < channel_args->num_args; i++) {
- if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_ref_internal(
- channel_args->args[i].value.pointer.p);
- }
- }
- }
-
*endpoint = NULL;
/* Use dualstack sockets where available. */
@@ -225,7 +215,7 @@ static void tcp_client_connect_impl(
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
- ac->resource_quota = resource_quota;
+ ac->channel_args = grpc_channel_args_copy(channel_args);
grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
@@ -247,7 +237,6 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_closure_sched(exec_ctx, on_done, final_error);
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 4d7cf3ff51..5f4b38de2b 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -52,7 +52,9 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
@@ -80,10 +82,14 @@ typedef struct {
int fd;
bool finished_edge;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
- size_t slice_size;
+ double target_length;
+ double bytes_read_this_round;
gpr_refcount refcount;
gpr_atm shutdown_count;
+ int min_read_chunk_size;
+ int max_read_chunk_size;
+
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;
@@ -108,6 +114,42 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+static void add_to_estimate(grpc_tcp *tcp, size_t bytes) {
+ tcp->bytes_read_this_round += (double)bytes;
+}
+
+static void finish_estimate(grpc_tcp *tcp) {
+ /* If we read >80% of the target buffer in one read loop, increase the size
+ of the target buffer to either the amount read, or twice its previous
+ value */
+ if (tcp->bytes_read_this_round > tcp->target_length * 0.8) {
+ tcp->target_length =
+ GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round);
+ } else {
+ tcp->target_length =
+ 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round;
+ }
+ tcp->bytes_read_this_round = 0;
+}
+
+static size_t get_target_read_size(grpc_tcp *tcp) {
+ grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user);
+ double pressure = grpc_resource_quota_get_memory_pressure(rq);
+ double target =
+ tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0);
+ size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size,
+ tcp->max_read_chunk_size)) +
+ 255) &
+ ~(size_t)255;
+ /* don't use more than 1/16th of the overall resource quota for a single read
+ * alloc */
+ size_t rqmax = grpc_resource_quota_peek_size(rq);
+ if (sz > rqmax / 16 && rqmax > 1024) {
+ sz = rqmax / 16;
+ }
+ return sz;
+}
+
static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
return grpc_error_set_str(
grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
@@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
- if (tcp->iov_size > 1) {
- tcp->iov_size /= 2;
- }
+ finish_estimate(tcp);
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
@@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
+ add_to_estimate(tcp, (size_t)read_bytes);
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - (size_t)read_bytes,
&tcp->last_read_buffer);
- } else if (tcp->iov_size < MAX_READ_IOVEC) {
- ++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
@@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
}
static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
- if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- grpc_resource_user_alloc_slices(
- exec_ctx, &tcp->slice_allocator, tcp->slice_size,
- (size_t)tcp->iov_size - tcp->incoming_buffer->count,
- tcp->incoming_buffer);
+ size_t target_read_size = get_target_read_size(tcp);
+ if (tcp->incoming_buffer->length < target_read_size &&
+ tcp->incoming_buffer->count < MAX_READ_IOVEC) {
+ grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
+ target_read_size, 1, tcp->incoming_buffer);
} else {
tcp_do_read(exec_ctx, tcp);
}
@@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_get_peer,
tcp_get_fd};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
- grpc_resource_quota *resource_quota,
- size_t slice_size, const char *peer_string) {
+#define MAX_CHUNK_SIZE 32 * 1024 * 1024
+
+grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
+ const grpc_channel_args *channel_args,
+ const char *peer_string) {
+ int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
+ int tcp_max_read_chunk_size = 4 * 1024 * 1024;
+ int tcp_min_read_chunk_size = 256;
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_min_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ MAX_CHUNK_SIZE};
+ tcp_max_read_chunk_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
+ if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
+ tcp_min_read_chunk_size = tcp_max_read_chunk_size;
+ }
+ tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size,
+ tcp_max_read_chunk_size);
+
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->release_fd_cb = NULL;
tcp->release_fd = NULL;
tcp->incoming_buffer = NULL;
- tcp->slice_size = slice_size;
+ tcp->target_length = (double)tcp_read_chunk_size;
+ tcp->min_read_chunk_size = tcp_min_read_chunk_size;
+ tcp->max_read_chunk_size = tcp_max_read_chunk_size;
+ tcp->bytes_read_this_round = 0;
tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
@@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
&tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index 1c0d13f96e..1ad5788331 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -47,14 +47,13 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
-#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
-
extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
-grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota,
- size_t read_slice_size, const char *peer_string);
+grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ const grpc_channel_args *args,
+ const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
release the fd.
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index d6a017cf7f..e66ffc9b1c 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -59,6 +59,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -90,7 +91,6 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
- s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
@@ -98,27 +98,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
- } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
- if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
- s->resource_quota =
- grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
- } else {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
- gpr_free(s);
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
- }
} else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
} else {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
@@ -138,6 +125,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->head = NULL;
s->tail = NULL;
s->nports = 0;
+ s->channel_args = grpc_channel_args_copy(args);
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
@@ -158,8 +146,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
s->head = sp->next;
gpr_free(sp);
}
-
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
+ grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s);
}
@@ -286,8 +273,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, sp->server->resource_quota,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str),
read_notifier_pollset, acceptor);
gpr_free(name);
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
index f5dc8532f9..c15e2e1493 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix.h
+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
@@ -103,7 +103,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
- grpc_resource_quota *resource_quota;
+ /* channel args for this server */
+ grpc_channel_args *channel_args;
};
/* If successful, add a listener to \a s for \a addr, set \a dsmode for the
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 12ce7d3fdd..4c17f08918 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -46,6 +46,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/pollset_windows.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -102,7 +103,7 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure *shutdown_complete;
- grpc_resource_quota *resource_quota;
+ grpc_channel_args *channel_args;
};
/* Public function. Allocates the proper data structures to hold a
@@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
- s->resource_quota = grpc_resource_quota_create(NULL);
- for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
- if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
- if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
- s->resource_quota =
- grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
- } else {
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
- gpr_free(s);
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
- }
- }
- }
+ s->channel_args = grpc_channel_args_copy(args);
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
@@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
- grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
+ grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s);
}
@@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_free(utf8_message);
}
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
- ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
- sp->server->resource_quota, peer_name_string);
+ ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name),
+ sp->server->channel_args, peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
} else {
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 9134883226..f74aa68793 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -430,9 +430,19 @@ static grpc_endpoint_vtable vtable = {win_read,
win_get_peer,
win_get_fd};
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
- grpc_resource_quota *resource_quota,
+grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
+ grpc_channel_args *channel_args,
char *peer_string) {
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 4402de1c38..abafdb22d2 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -50,8 +50,8 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
- grpc_resource_quota *resource_quota,
+grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
+ grpc_channel_args *channel_args,
char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock);
diff --git a/src/core/lib/slice/slice_buffer.c b/src/core/lib/slice/slice_buffer.c
index 9176dc8a42..c96b9c3b28 100644
--- a/src/core/lib/slice/slice_buffer.c
+++ b/src/core/lib/slice/slice_buffer.c
@@ -46,27 +46,29 @@
#define GROW(x) (3 * (x) / 2)
static void maybe_embiggen(grpc_slice_buffer *sb) {
- if (sb->base_slices != sb->slices) {
- memmove(sb->base_slices, sb->slices, sb->count * sizeof(grpc_slice));
- sb->slices = sb->base_slices;
- }
-
/* How far away from sb->base_slices is sb->slices pointer */
size_t slice_offset = (size_t)(sb->slices - sb->base_slices);
size_t slice_count = sb->count + slice_offset;
if (slice_count == sb->capacity) {
- sb->capacity = GROW(sb->capacity);
- GPR_ASSERT(sb->capacity > slice_count);
- if (sb->base_slices == sb->inlined) {
- sb->base_slices = gpr_malloc(sb->capacity * sizeof(grpc_slice));
- memcpy(sb->base_slices, sb->inlined, slice_count * sizeof(grpc_slice));
+ if (sb->base_slices != sb->slices) {
+ /* Make room by moving elements if there's still space unused */
+ memmove(sb->base_slices, sb->slices, sb->count * sizeof(grpc_slice));
+ sb->slices = sb->base_slices;
} else {
- sb->base_slices =
- gpr_realloc(sb->base_slices, sb->capacity * sizeof(grpc_slice));
- }
+ /* Allocate more memory if no more space is available */
+ sb->capacity = GROW(sb->capacity);
+ GPR_ASSERT(sb->capacity > slice_count);
+ if (sb->base_slices == sb->inlined) {
+ sb->base_slices = gpr_malloc(sb->capacity * sizeof(grpc_slice));
+ memcpy(sb->base_slices, sb->inlined, slice_count * sizeof(grpc_slice));
+ } else {
+ sb->base_slices =
+ gpr_realloc(sb->base_slices, sb->capacity * sizeof(grpc_slice));
+ }
- sb->slices = sb->base_slices + slice_offset;
+ sb->slices = sb->base_slices + slice_offset;
+ }
}
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c37ead2318..97d50a91be 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -145,16 +145,28 @@ typedef struct batch_control {
grpc_transport_stream_op_batch op;
} batch_control;
+typedef struct {
+ gpr_mu child_list_mu;
+ grpc_call *first_child;
+} parent_call;
+
+typedef struct {
+ grpc_call *parent;
+ /** siblings: children of the same parent form a list, and this list is
+ protected under
+ parent->mu */
+ grpc_call *sibling_next;
+ grpc_call *sibling_prev;
+} child_call;
+
struct grpc_call {
gpr_arena *arena;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
- grpc_call *parent;
- grpc_call *first_child;
gpr_timespec start_time;
- /* protects first_child, and child next/prev links */
- gpr_mu child_list_mu;
+ /* parent_call* */ gpr_atm parent_call_atm;
+ child_call *child_call;
/* client or server call */
bool is_client;
@@ -206,12 +218,6 @@ struct grpc_call {
int send_extra_metadata_count;
gpr_timespec send_deadline;
- /** siblings: children of the same parent form a list, and this list is
- protected under
- parent->mu */
- grpc_call *sibling_next;
- grpc_call *sibling_prev;
-
grpc_slice_buffer_stream sending_stream;
grpc_byte_stream *receiving_stream;
@@ -276,6 +282,23 @@ static void add_init_error(grpc_error **composite, grpc_error *new) {
*composite = grpc_error_add_child(*composite, new);
}
+static parent_call *get_or_create_parent_call(grpc_call *call) {
+ parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+ if (p == NULL) {
+ p = gpr_arena_alloc(call->arena, sizeof(*p));
+ gpr_mu_init(&p->child_list_mu);
+ if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
+ gpr_mu_destroy(&p->child_list_mu);
+ p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+ }
+ }
+ return p;
+}
+
+static parent_call *get_parent_call(grpc_call *call) {
+ return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+}
+
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
const grpc_call_create_args *args,
grpc_call **out_call) {
@@ -291,10 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
call->arena = arena;
*out_call = call;
- gpr_mu_init(&call->child_list_mu);
call->channel = args->channel;
call->cq = args->cq;
- call->parent = args->parent_call;
call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
@@ -326,11 +347,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
if (args->parent_call != NULL) {
+ child_call *cc = call->child_call =
+ gpr_arena_alloc(arena, sizeof(child_call));
+ call->child_call->parent = args->parent_call;
+
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent_call->is_client);
- gpr_mu_lock(&args->parent_call->child_list_mu);
+ parent_call *pc = get_or_create_parent_call(args->parent_call);
+
+ gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
@@ -364,17 +391,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
}
- if (args->parent_call->first_child == NULL) {
- args->parent_call->first_child = call;
- call->sibling_next = call->sibling_prev = call;
+ if (pc->first_child == NULL) {
+ pc->first_child = call;
+ cc->sibling_next = cc->sibling_prev = call;
} else {
- call->sibling_next = args->parent_call->first_child;
- call->sibling_prev = args->parent_call->first_child->sibling_prev;
- call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
- call;
+ cc->sibling_next = pc->first_child;
+ cc->sibling_prev = pc->first_child->child_call->sibling_prev;
+ cc->sibling_next->child_call->sibling_prev =
+ cc->sibling_prev->child_call->sibling_next = call;
}
- gpr_mu_unlock(&args->parent_call->child_list_mu);
+ gpr_mu_unlock(&pc->child_list_mu);
}
call->send_deadline = send_deadline;
@@ -469,7 +496,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- gpr_mu_destroy(&c->child_list_mu);
+ parent_call *pc = get_parent_call(c);
+ if (pc != NULL) {
+ gpr_mu_destroy(&pc->child_list_mu);
+ }
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
}
@@ -499,31 +529,31 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
}
void grpc_call_destroy(grpc_call *c) {
- int cancel;
- grpc_call *parent = c->parent;
+ child_call *cc = c->child_call;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_destroy", 0);
GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
- if (parent) {
- gpr_mu_lock(&parent->child_list_mu);
- if (c == parent->first_child) {
- parent->first_child = c->sibling_next;
- if (c == parent->first_child) {
- parent->first_child = NULL;
+ if (cc) {
+ parent_call *pc = get_parent_call(cc->parent);
+ gpr_mu_lock(&pc->child_list_mu);
+ if (c == pc->first_child) {
+ pc->first_child = cc->sibling_next;
+ if (c == pc->first_child) {
+ pc->first_child = NULL;
}
}
- c->sibling_prev->sibling_next = c->sibling_next;
- c->sibling_next->sibling_prev = c->sibling_prev;
- gpr_mu_unlock(&parent->child_list_mu);
- GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
+ cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
+ cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
+ gpr_mu_unlock(&pc->child_list_mu);
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) &&
- !gpr_atm_acq_load(&c->received_final_op_atm);
+ bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
+ gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
@@ -1079,7 +1109,6 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) {
static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
- grpc_call *child_call;
grpc_call *next_child_call;
grpc_call *call = bctl->call;
grpc_error *error = consolidate_batch_errors(bctl);
@@ -1104,21 +1133,25 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
- gpr_mu_lock(&call->child_list_mu);
- child_call = call->first_child;
- if (child_call != NULL) {
- do {
- next_child_call = child_call->sibling_next;
- if (child_call->cancellation_is_inherited) {
- GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
- }
- child_call = next_child_call;
- } while (child_call != call->first_child);
+ parent_call *pc = get_parent_call(call);
+ if (pc != NULL) {
+ grpc_call *child;
+ gpr_mu_lock(&pc->child_list_mu);
+ child = pc->first_child;
+ if (child != NULL) {
+ do {
+ next_child_call = child->child_call->sibling_next;
+ if (child->cancellation_is_inherited) {
+ GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
+ cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
+ }
+ child = next_child_call;
+ } while (child != pc->first_child);
+ }
+ gpr_mu_unlock(&pc->child_list_mu);
}
- gpr_mu_unlock(&call->child_list_mu);
if (call->is_client) {
get_final_status(call, set_status_value_directly,
diff --git a/src/node/index.js b/src/node/index.js
index a294aad8ee..071bfd7927 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -52,42 +52,89 @@ var Metadata = require('./src/metadata.js');
var grpc = require('./src/grpc_extension');
+var protobuf_js_5_common = require('./src/protobuf_js_5_common');
+var protobuf_js_6_common = require('./src/protobuf_js_6_common');
+
grpc.setDefaultRootsPem(fs.readFileSync(SSL_ROOTS_PATH, 'ascii'));
/**
- * Load a gRPC object from an existing ProtoBuf.Reflect object.
- * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load.
- * @param {Object=} options Options to apply to the loaded object
+ * Load a ProtoBuf.js object as a gRPC object. The options object can provide
+ * the following options:
+ * - binaryAsBase64: deserialize bytes values as base64 strings instead of
+ * Buffers. Defaults to false
+ * - longsAsStrings: deserialize long values as strings instead of objects.
+ * Defaults to true
+ * - enumsAsStrings: deserialize enum values as strings instead of numbers.
+ * Defaults to true
+ * - deprecatedArgumentOrder: Use the beta method argument order for client
+ * methods, with optional arguments after the callback. Defaults to false.
+ * This option is only a temporary stopgap measure to smooth an API breakage.
+ * It is deprecated, and new code should not use it.
+ * - protobufjsVersion: Available values are 5, 6, and 'detect'. 5 and 6
+ * respectively indicate that an object from the corresponding version of
+ * ProtoBuf.js is provided in the value argument. If the option is 'detect',
+ * gRPC will guess what the version is based on the structure of the value.
+ * Defaults to 'detect'.
+ * @param {Object} value The ProtoBuf.js reflection object to load
+ * @param {Object=} options Options to apply to the loaded file
* @return {Object<string, *>} The resulting gRPC object
*/
exports.loadObject = function loadObject(value, options) {
- var result = {};
- if (value.className === 'Namespace') {
- _.each(value.children, function(child) {
- result[child.name] = loadObject(child, options);
- });
- return result;
- } else if (value.className === 'Service') {
- return client.makeProtobufClientConstructor(value, options);
- } else if (value.className === 'Message' || value.className === 'Enum') {
- return value.build();
+ options = _.defaults(options, common.defaultGrpcOptions);
+ options = _.defaults(options, {'protobufjsVersion': 'detect'});
+ var protobufjsVersion;
+ if (options.protobufjsVersion === 'detect') {
+ if (protobuf_js_6_common.isProbablyProtobufJs6(value)) {
+ protobufjsVersion = 6;
+ } else if (protobuf_js_5_common.isProbablyProtobufJs5(value)) {
+ protobufjsVersion = 5;
+ } else {
+ var error_message = 'Could not detect ProtoBuf.js version. Please ' +
+ 'specify the version number with the "protobufjs_version" option';
+ throw new Error(error_message);
+ }
} else {
- return value;
+ protobufjsVersion = options.protobufjsVersion;
+ }
+ switch (protobufjsVersion) {
+ case 6: return protobuf_js_6_common.loadObject(value, options);
+ case 5:
+ var deprecation_message = 'Calling grpc.loadObject with an object ' +
+ 'generated by ProtoBuf.js 5 is deprecated. Please upgrade to ' +
+ 'ProtoBuf.js 6.';
+ common.log(grpc.logVerbosity.INFO, deprecation_message);
+ return protobuf_js_5_common.loadObject(value, options);
+ default:
+ throw new Error('Unrecognized protobufjsVersion', protobufjsVersion);
}
};
var loadObject = exports.loadObject;
+function applyProtoRoot(filename, root) {
+ if (_.isString(filename)) {
+ return filename;
+ }
+ filename.root = path.resolve(filename.root) + '/';
+ root.resolvePath = function(originPath, importPath, alreadyNormalized) {
+ return ProtoBuf.util.path.resolve(filename.root,
+ importPath,
+ alreadyNormalized);
+ };
+ return filename.file;
+}
+
/**
* Load a gRPC object from a .proto file. The options object can provide the
* following options:
- * - convertFieldsToCamelCase: Loads this file with that option on protobuf.js
- * set as specified. See
- * https://github.com/dcodeIO/protobuf.js/wiki/Advanced-options for details
+ * - convertFieldsToCamelCase: Load this file with field names in camel case
+ * instead of their original case
* - binaryAsBase64: deserialize bytes values as base64 strings instead of
* Buffers. Defaults to false
* - longsAsStrings: deserialize long values as strings instead of objects.
* Defaults to true
+ * - enumsAsStrings: deserialize enum values as strings instead of numbers.
+ * Defaults to true
* - deprecatedArgumentOrder: Use the beta method argument order for client
* methods, with optional arguments after the callback. Defaults to false.
* This option is only a temporary stopgap measure to smooth an API breakage.
@@ -99,29 +146,17 @@ var loadObject = exports.loadObject;
* @return {Object<string, *>} The resulting gRPC object
*/
exports.load = function load(filename, format, options) {
- if (!format) {
- format = 'proto';
- }
- var convertFieldsToCamelCaseOriginal = ProtoBuf.convertFieldsToCamelCase;
- if(options && options.hasOwnProperty('convertFieldsToCamelCase')) {
- ProtoBuf.convertFieldsToCamelCase = options.convertFieldsToCamelCase;
- }
- var builder;
- try {
- switch(format) {
- case 'proto':
- builder = ProtoBuf.loadProtoFile(filename);
- break;
- case 'json':
- builder = ProtoBuf.loadJsonFile(filename);
- break;
- default:
- throw new Error('Unrecognized format "' + format + '"');
- }
- } finally {
- ProtoBuf.convertFieldsToCamelCase = convertFieldsToCamelCaseOriginal;
- }
- return loadObject(builder.ns, options);
+ /* Note: format is currently unused, because the API for loading a proto
+ file or a JSON file is identical in Protobuf.js 6. In the future, there is
+ still the possibility of adding other formats that would be loaded
+ differently */
+ options = _.defaults(options, common.defaultGrpcOptions);
+ options.protobufjs_version = 6;
+ var root = new ProtoBuf.Root();
+ var parse_options = {keepCase: !options.convertFieldsToCamelCase};
+ return loadObject(root.loadSync(applyProtoRoot(filename, root),
+ parse_options),
+ options);
};
var log_template = _.template(
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 05f52a1083..83b8a7c1ec 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -228,7 +228,7 @@ function getServer(port, tls) {
server_creds = grpc.ServerCredentials.createInsecure();
}
var server = new grpc.Server(options);
- server.addProtoService(testProto.TestService.service, {
+ server.addService(testProto.TestService.service, {
emptyCall: handleEmpty,
unaryCall: handleUnary,
streamingOutputCall: handleStreamingOutput,
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index 7158af775a..ea85029d98 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -140,7 +140,7 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
streamingCall: makeStreamingGenericCall(response_size)
});
} else {
- server.addProtoService(serviceProto.BenchmarkService.service, {
+ server.addService(serviceProto.BenchmarkService.service, {
unaryCall: unaryCall,
streamingCall: streamingCall
});
diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js
index 030bf7d7ba..90a9b7d59c 100644
--- a/src/node/performance/worker.js
+++ b/src/node/performance/worker.js
@@ -44,8 +44,8 @@ var serviceProto = grpc.load({
function runServer(port, benchmark_impl) {
var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server();
- server.addProtoService(serviceProto.WorkerService.service,
- new WorkerServiceImpl(benchmark_impl, server));
+ server.addService(serviceProto.WorkerService.service,
+ new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port;
server.bind(address, server_creds);
server.start();
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 44081a3a6c..1aaf35c16c 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -780,6 +780,8 @@ exports.makeClientConstructor = function(methods, serviceName,
_.assign(Client.prototype[name], attrs);
});
+ Client.service = methods;
+
return Client;
};
@@ -823,26 +825,6 @@ exports.waitForClientReady = function(client, deadline, callback) {
};
/**
- * Creates a constructor for clients for the given service
- * @param {ProtoBuf.Reflect.Service} service The service to generate a client
- * for
- * @param {Object=} options Options to apply to the client
- * @return {function(string, Object)} New client constructor
- */
-exports.makeProtobufClientConstructor = function(service, options) {
- var method_attrs = common.getProtobufServiceAttrs(service, options);
- if (!options) {
- options = {deprecatedArgumentOrder: false};
- }
- var Client = exports.makeClientConstructor(
- method_attrs, common.fullyQualifiedName(service),
- options);
- Client.service = service;
- Client.service.grpc_options = options;
- return Client;
-};
-
-/**
* Map of status code names to status codes
*/
exports.status = grpc.status;
diff --git a/src/node/src/common.js b/src/node/src/common.js
index a0fe4480ea..757969dbdd 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -42,74 +42,6 @@
var _ = require('lodash');
/**
- * Get a function that deserializes a specific type of protobuf.
- * @param {function()} cls The constructor of the message type to deserialize
- * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings
- * instead of Buffers. Defaults to false
- * @param {bool=} longsAsStrings Deserialize long values as strings instead of
- * objects. Defaults to true
- * @return {function(Buffer):cls} The deserialization function
- */
-exports.deserializeCls = function deserializeCls(cls, binaryAsBase64,
- longsAsStrings) {
- if (binaryAsBase64 === undefined || binaryAsBase64 === null) {
- binaryAsBase64 = false;
- }
- if (longsAsStrings === undefined || longsAsStrings === null) {
- longsAsStrings = true;
- }
- /**
- * Deserialize a buffer to a message object
- * @param {Buffer} arg_buf The buffer to deserialize
- * @return {cls} The resulting object
- */
- return function deserialize(arg_buf) {
- // Convert to a native object with binary fields as Buffers (first argument)
- // and longs as strings (second argument)
- return cls.decode(arg_buf).toRaw(binaryAsBase64, longsAsStrings);
- };
-};
-
-var deserializeCls = exports.deserializeCls;
-
-/**
- * Get a function that serializes objects to a buffer by protobuf class.
- * @param {function()} Cls The constructor of the message type to serialize
- * @return {function(Cls):Buffer} The serialization function
- */
-exports.serializeCls = function serializeCls(Cls) {
- /**
- * Serialize an object to a Buffer
- * @param {Object} arg The object to serialize
- * @return {Buffer} The serialized object
- */
- return function serialize(arg) {
- return new Buffer(new Cls(arg).encode().toBuffer());
- };
-};
-
-var serializeCls = exports.serializeCls;
-
-/**
- * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value.
- * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of
- * @return {string} The fully qualified name of the value
- */
-exports.fullyQualifiedName = function fullyQualifiedName(value) {
- if (value === null || value === undefined) {
- return '';
- }
- var name = value.name;
- var parent_name = fullyQualifiedName(value.parent);
- if (parent_name !== '') {
- name = parent_name + '.' + name;
- }
- return name;
-};
-
-var fullyQualifiedName = exports.fullyQualifiedName;
-
-/**
* Wrap a function to pass null-like values through without calling it. If no
* function is given, just uses the identity;
* @param {?function} func The function to wrap
@@ -128,44 +60,6 @@ exports.wrapIgnoreNull = function wrapIgnoreNull(func) {
};
/**
- * Return a map from method names to method attributes for the service.
- * @param {ProtoBuf.Reflect.Service} service The service to get attributes for
- * @param {Object=} options Options to apply to these attributes
- * @return {Object} The attributes map
- */
-exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
- options) {
- var prefix = '/' + fullyQualifiedName(service) + '/';
- var binaryAsBase64, longsAsStrings;
- if (options) {
- binaryAsBase64 = options.binaryAsBase64;
- longsAsStrings = options.longsAsStrings;
- }
- /* This slightly awkward construction is used to make sure we only use
- lodash@3.10.1-compatible functions. A previous version used
- _.fromPairs, which would be cleaner, but was introduced in lodash
- version 4 */
- return _.zipObject(_.map(service.children, function(method) {
- return _.camelCase(method.name);
- }), _.map(service.children, function(method) {
- return {
- originalName: method.name,
- path: prefix + method.name,
- requestStream: method.requestStream,
- responseStream: method.responseStream,
- requestType: method.resolvedRequestType,
- responseType: method.resolvedResponseType,
- requestSerialize: serializeCls(method.resolvedRequestType.build()),
- requestDeserialize: deserializeCls(method.resolvedRequestType.build(),
- binaryAsBase64, longsAsStrings),
- responseSerialize: serializeCls(method.resolvedResponseType.build()),
- responseDeserialize: deserializeCls(method.resolvedResponseType.build(),
- binaryAsBase64, longsAsStrings)
- };
- }));
-};
-
-/**
* The logger object for the gRPC module. Defaults to console.
*/
exports.logger = console;
@@ -185,3 +79,14 @@ exports.log = function log(severity, message) {
exports.logger.error(message);
}
};
+
+/**
+ * Default options for loading proto files into gRPC
+ */
+exports.defaultGrpcOptions = {
+ convertFieldsToCamelCase: false,
+ binaryAsBase64: false,
+ longsAsStrings: true,
+ enumsAsStrings: true,
+ deprecatedArgumentOrder: false
+};
diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js
new file mode 100644
index 0000000000..62cf2f4aca
--- /dev/null
+++ b/src/node/src/protobuf_js_5_common.js
@@ -0,0 +1,181 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var _ = require('lodash');
+var client = require('./client');
+
+/**
+ * Get a function that deserializes a specific type of protobuf.
+ * @param {function()} cls The constructor of the message type to deserialize
+ * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings
+ * instead of Buffers. Defaults to false
+ * @param {bool=} longsAsStrings Deserialize long values as strings instead of
+ * objects. Defaults to true
+ * @return {function(Buffer):cls} The deserialization function
+ */
+exports.deserializeCls = function deserializeCls(cls, binaryAsBase64,
+ longsAsStrings) {
+ /**
+ * Deserialize a buffer to a message object
+ * @param {Buffer} arg_buf The buffer to deserialize
+ * @return {cls} The resulting object
+ */
+ return function deserialize(arg_buf) {
+ // Convert to a native object with binary fields as Buffers (first argument)
+ // and longs as strings (second argument)
+ return cls.decode(arg_buf).toRaw(binaryAsBase64, longsAsStrings);
+ };
+};
+
+var deserializeCls = exports.deserializeCls;
+
+/**
+ * Get a function that serializes objects to a buffer by protobuf class.
+ * @param {function()} Cls The constructor of the message type to serialize
+ * @return {function(Cls):Buffer} The serialization function
+ */
+exports.serializeCls = function serializeCls(Cls) {
+ /**
+ * Serialize an object to a Buffer
+ * @param {Object} arg The object to serialize
+ * @return {Buffer} The serialized object
+ */
+ return function serialize(arg) {
+ return new Buffer(new Cls(arg).encode().toBuffer());
+ };
+};
+
+var serializeCls = exports.serializeCls;
+
+/**
+ * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value.
+ * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of
+ * @return {string} The fully qualified name of the value
+ */
+exports.fullyQualifiedName = function fullyQualifiedName(value) {
+ if (value === null || value === undefined) {
+ return '';
+ }
+ var name = value.name;
+ var parent_name = fullyQualifiedName(value.parent);
+ if (parent_name !== '') {
+ name = parent_name + '.' + name;
+ }
+ return name;
+};
+
+var fullyQualifiedName = exports.fullyQualifiedName;
+
+/**
+ * Return a map from method names to method attributes for the service.
+ * @param {ProtoBuf.Reflect.Service} service The service to get attributes for
+ * @param {Object=} options Options to apply to these attributes
+ * @return {Object} The attributes map
+ */
+exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
+ options) {
+ var prefix = '/' + fullyQualifiedName(service) + '/';
+ var binaryAsBase64, longsAsStrings;
+ if (options) {
+ binaryAsBase64 = options.binaryAsBase64;
+ longsAsStrings = options.longsAsStrings;
+ }
+ /* This slightly awkward construction is used to make sure we only use
+ lodash@3.10.1-compatible functions. A previous version used
+ _.fromPairs, which would be cleaner, but was introduced in lodash
+ version 4 */
+ return _.zipObject(_.map(service.children, function(method) {
+ return _.camelCase(method.name);
+ }), _.map(service.children, function(method) {
+ return {
+ originalName: method.name,
+ path: prefix + method.name,
+ requestStream: method.requestStream,
+ responseStream: method.responseStream,
+ requestType: method.resolvedRequestType,
+ responseType: method.resolvedResponseType,
+ requestSerialize: serializeCls(method.resolvedRequestType.build()),
+ requestDeserialize: deserializeCls(method.resolvedRequestType.build(),
+ binaryAsBase64, longsAsStrings),
+ responseSerialize: serializeCls(method.resolvedResponseType.build()),
+ responseDeserialize: deserializeCls(method.resolvedResponseType.build(),
+ binaryAsBase64, longsAsStrings)
+ };
+ }));
+};
+
+var getProtobufServiceAttrs = exports.getProtobufServiceAttrs;
+
+/**
+ * Load a gRPC object from an existing ProtoBuf.Reflect object.
+ * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load.
+ * @param {Object=} options Options to apply to the loaded object
+ * @return {Object<string, *>} The resulting gRPC object
+ */
+exports.loadObject = function loadObject(value, options) {
+ var result = {};
+ if (!value) {
+ return value;
+ }
+ if (value.hasOwnProperty('ns')) {
+ return loadObject(value.ns, options);
+ }
+ if (value.className === 'Namespace') {
+ _.each(value.children, function(child) {
+ result[child.name] = loadObject(child, options);
+ });
+ return result;
+ } else if (value.className === 'Service') {
+ return client.makeClientConstructor(getProtobufServiceAttrs(value, options),
+ options);
+ } else if (value.className === 'Message' || value.className === 'Enum') {
+ return value.build();
+ } else {
+ return value;
+ }
+};
+
+/**
+ * The primary purpose of this method is to distinguish between reflection
+ * objects from different versions of ProtoBuf.js. This is just a heuristic,
+ * checking for properties that are (currently) specific to this version of
+ * ProtoBuf.js
+ * @param {Object} obj The object to check
+ * @return {boolean} Whether the object appears to be a Protobuf.js 5
+ * ReflectionObject
+ */
+exports.isProbablyProtobufJs5 = function isProbablyProtobufJs5(obj) {
+ return _.isArray(obj.children) && (typeof obj.build === 'function');
+};
diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js
new file mode 100644
index 0000000000..00f11f2736
--- /dev/null
+++ b/src/node/src/protobuf_js_6_common.js
@@ -0,0 +1,170 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var _ = require('lodash');
+var client = require('./client');
+
+/**
+ * Get a function that deserializes a specific type of protobuf.
+ * @param {function()} cls The constructor of the message type to deserialize
+ * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings
+ * instead of Buffers. Defaults to false
+ * @param {bool=} longsAsStrings Deserialize long values as strings instead of
+ * objects. Defaults to true
+ * @return {function(Buffer):cls} The deserialization function
+ */
+exports.deserializeCls = function deserializeCls(cls, options) {
+ var conversion_options = {
+ defaults: true,
+ bytes: options.binaryAsBase64 ? String : Buffer,
+ longs: options.longsAsStrings ? String : null,
+ enums: options.enumsAsStrings ? String : null,
+ oneofs: true
+ };
+ /**
+ * Deserialize a buffer to a message object
+ * @param {Buffer} arg_buf The buffer to deserialize
+ * @return {cls} The resulting object
+ */
+ return function deserialize(arg_buf) {
+ return cls.decode(arg_buf).toObject(conversion_options);
+ };
+};
+
+var deserializeCls = exports.deserializeCls;
+
+/**
+ * Get a function that serializes objects to a buffer by protobuf class.
+ * @param {function()} Cls The constructor of the message type to serialize
+ * @return {function(Cls):Buffer} The serialization function
+ */
+exports.serializeCls = function serializeCls(cls) {
+ /**
+ * Serialize an object to a Buffer
+ * @param {Object} arg The object to serialize
+ * @return {Buffer} The serialized object
+ */
+ return function serialize(arg) {
+ var message = cls.fromObject(arg);
+ return cls.encode(message).finish();
+ };
+};
+
+var serializeCls = exports.serializeCls;
+
+/**
+ * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value.
+ * @param {ProtoBuf.ReflectionObject} value The value to get the name of
+ * @return {string} The fully qualified name of the value
+ */
+exports.fullyQualifiedName = function fullyQualifiedName(value) {
+ if (value === null || value === undefined) {
+ return '';
+ }
+ var name = value.name;
+ var parent_fqn = fullyQualifiedName(value.parent);
+ if (parent_fqn !== '') {
+ name = parent_fqn + '.' + name;
+ }
+ return name;
+};
+
+var fullyQualifiedName = exports.fullyQualifiedName;
+
+/**
+ * Return a map from method names to method attributes for the service.
+ * @param {ProtoBuf.Service} service The service to get attributes for
+ * @param {Object=} options Options to apply to these attributes
+ * @return {Object} The attributes map
+ */
+exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
+ options) {
+ var prefix = '/' + fullyQualifiedName(service) + '/';
+ service.resolveAll();
+ return _.zipObject(_.map(service.methods, function(method) {
+ return _.camelCase(method.name);
+ }), _.map(service.methods, function(method) {
+ return {
+ originalName: method.name,
+ path: prefix + method.name,
+ requestStream: !!method.requestStream,
+ responseStream: !!method.responseStream,
+ requestType: method.resolvedRequestType,
+ responseType: method.resolvedResponseType,
+ requestSerialize: serializeCls(method.resolvedRequestType),
+ requestDeserialize: deserializeCls(method.resolvedRequestType, options),
+ responseSerialize: serializeCls(method.resolvedResponseType),
+ responseDeserialize: deserializeCls(method.resolvedResponseType, options)
+ };
+ }));
+};
+
+var getProtobufServiceAttrs = exports.getProtobufServiceAttrs;
+
+exports.loadObject = function loadObject(value, options) {
+ var result = {};
+ if (!value) {
+ return value;
+ }
+ if (value.hasOwnProperty('methods')) {
+ // It's a service object
+ var service_attrs = getProtobufServiceAttrs(value, options);
+ return client.makeClientConstructor(service_attrs);
+ }
+
+ if (value.hasOwnProperty('nested')) {
+ // It's a namespace or root object
+ _.each(value.nested, function(nested, name) {
+ result[name] = loadObject(nested, options);
+ });
+ return result;
+ }
+
+ // Otherwise, it's not something we need to change
+ return value;
+};
+
+/**
+ * The primary purpose of this method is to distinguish between reflection
+ * objects from different versions of ProtoBuf.js. This is just a heuristic,
+ * checking for properties that are (currently) specific to this version of
+ * ProtoBuf.js
+ * @param {Object} obj The object to check
+ * @return {boolean} Whether the object appears to be a Protobuf.js 6
+ * ReflectionObject
+ */
+exports.isProbablyProtobufJs6 = function isProbablyProtobufJs6(obj) {
+ return (typeof obj.root === 'object') && (typeof obj.resolve === 'function');
+};
diff --git a/src/node/src/server.js b/src/node/src/server.js
index bdb4a56203..3450abed08 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -781,17 +781,31 @@ Server.prototype.addService = function(service, implementation) {
/**
* Add a proto service to the server, with a corresponding implementation
+ * @deprecated Use grpc.load and Server#addService instead
* @param {Protobuf.Reflect.Service} service The proto service descriptor
* @param {Object<String, function>} implementation Map of method names to
* method implementation for the provided service.
*/
Server.prototype.addProtoService = function(service, implementation) {
var options;
- if (service.grpc_options) {
- options = service.grpc_options;
+ var protobuf_js_5_common = require('./protobuf_js_5_common');
+ var protobuf_js_6_common = require('./protobuf_js_6_common');
+ common.log(grpc.logVerbosity.INFO,
+ 'Server#addProtoService is deprecated. Use addService instead');
+ if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
+ options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
+ this.addService(
+ protobuf_js_5_common.getProtobufServiceAttrs(service, options),
+ implementation);
+ } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) {
+ options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
+ this.addService(
+ protobuf_js_6_common.getProtobufServiceAttrs(service, options),
+ implementation);
+ } else {
+ // We assume that this is a service attributes object
+ this.addService(service, implementation);
}
- this.addService(common.getProtobufServiceAttrs(service, options),
- implementation);
};
/**
diff --git a/src/node/stress/metrics_server.js b/src/node/stress/metrics_server.js
index 3ab4b4c82d..b3f939e8f3 100644
--- a/src/node/stress/metrics_server.js
+++ b/src/node/stress/metrics_server.js
@@ -63,7 +63,7 @@ function getAllGauges(call) {
function MetricsServer(port) {
var server = new grpc.Server();
- server.addProtoService(metrics.MetricsService.service, {
+ server.addService(metrics.MetricsService.service, {
getGauge: _.bind(getGauge, this),
getAllGauges: _.bind(getAllGauges, this)
});
diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js
index c57b7388f6..e1ce864f97 100644
--- a/src/node/test/common_test.js
+++ b/src/node/test/common_test.js
@@ -34,17 +34,26 @@
'use strict';
var assert = require('assert');
+var _ = require('lodash');
-var common = require('../src/common.js');
+var common = require('../src/common');
+var protobuf_js_6_common = require('../src/protobuf_js_6_common');
+
+var serializeCls = protobuf_js_6_common.serializeCls;
+var deserializeCls = protobuf_js_6_common.deserializeCls;
var ProtoBuf = require('protobufjs');
-var messages_proto = ProtoBuf.loadProtoFile(
- __dirname + '/test_messages.proto').build();
+var messages_proto = new ProtoBuf.Root();
+messages_proto = messages_proto.loadSync(
+ __dirname + '/test_messages.proto', {keepCase: true}).resolveAll();
+
+var default_options = common.defaultGrpcOptions;
describe('Proto message long int serialize and deserialize', function() {
- var longSerialize = common.serializeCls(messages_proto.LongValues);
- var longDeserialize = common.deserializeCls(messages_proto.LongValues);
+ var longSerialize = serializeCls(messages_proto.LongValues);
+ var longDeserialize = deserializeCls(messages_proto.LongValues,
+ default_options);
var pos_value = '314159265358979';
var neg_value = '-27182818284590';
it('should preserve positive int64 values', function() {
@@ -88,8 +97,9 @@ describe('Proto message long int serialize and deserialize', function() {
neg_value);
});
it('should deserialize as a number with the right option set', function() {
- var longNumDeserialize = common.deserializeCls(messages_proto.LongValues,
- false, false);
+ var num_options = _.defaults({longsAsStrings: false}, default_options);
+ var longNumDeserialize = deserializeCls(messages_proto.LongValues,
+ num_options);
var serialized = longSerialize({int_64: pos_value});
assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string');
/* With the longsAsStrings option disabled, long values are represented as
@@ -98,11 +108,12 @@ describe('Proto message long int serialize and deserialize', function() {
});
});
describe('Proto message bytes serialize and deserialize', function() {
- var sequenceSerialize = common.serializeCls(messages_proto.SequenceValues);
- var sequenceDeserialize = common.deserializeCls(
- messages_proto.SequenceValues);
- var sequenceBase64Deserialize = common.deserializeCls(
- messages_proto.SequenceValues, true);
+ var sequenceSerialize = serializeCls(messages_proto.SequenceValues);
+ var sequenceDeserialize = deserializeCls(
+ messages_proto.SequenceValues, default_options);
+ var b64_options = _.defaults({binaryAsBase64: true}, default_options);
+ var sequenceBase64Deserialize = deserializeCls(
+ messages_proto.SequenceValues, b64_options);
var buffer_val = new Buffer([0x69, 0xb7]);
var base64_val = 'abc=';
it('should preserve a buffer', function() {
@@ -120,19 +131,73 @@ describe('Proto message bytes serialize and deserialize', function() {
var deserialized = sequenceBase64Deserialize(serialized);
assert.strictEqual(deserialized.bytes_field, base64_val);
});
- /* The next two tests are specific tests to verify that issue
- * https://github.com/grpc/grpc/issues/5174 has been fixed. They are skipped
- * because they will not pass until a protobuf.js release has been published
- * with a fix for https://github.com/dcodeIO/protobuf.js/issues/390 */
- it.skip('should serialize a repeated field as packed by default', function() {
- var expected_serialize = new Buffer([0x12, 0x01, 0x01, 0x0a]);
+ it('should serialize a repeated field as packed by default', function() {
+ var expected_serialize = new Buffer([0x12, 0x01, 0x0a]);
var serialized = sequenceSerialize({repeated_field: [10]});
assert.strictEqual(expected_serialize.compare(serialized), 0);
});
- it.skip('should deserialize packed or unpacked repeated', function() {
- var serialized = new Buffer([0x12, 0x01, 0x01, 0x0a]);
+ it('should deserialize packed or unpacked repeated', function() {
+ var expectedDeserialize = {
+ bytes_field: new Buffer(''),
+ repeated_field: [10]
+ };
+ var packedSerialized = new Buffer([0x12, 0x01, 0x0a]);
+ var unpackedSerialized = new Buffer([0x10, 0x0a]);
+ var packedDeserialized;
+ var unpackedDeserialized;
assert.doesNotThrow(function() {
- sequenceDeserialize(serialized);
+ packedDeserialized = sequenceDeserialize(packedSerialized);
});
+ assert.doesNotThrow(function() {
+ unpackedDeserialized = sequenceDeserialize(unpackedSerialized);
+ });
+ assert.deepEqual(packedDeserialized, expectedDeserialize);
+ assert.deepEqual(unpackedDeserialized, expectedDeserialize);
+ });
+});
+describe('Proto message oneof serialize and deserialize', function() {
+ var oneofSerialize = serializeCls(messages_proto.OneOfValues);
+ var oneofDeserialize = deserializeCls(
+ messages_proto.OneOfValues, default_options);
+ it('Should have idempotent round trips', function() {
+ var test_message = {oneof_choice: 'int_choice', int_choice: 5};
+ var serialized1 = oneofSerialize(test_message);
+ var deserialized1 = oneofDeserialize(serialized1);
+ assert.equal(deserialized1.int_choice, 5);
+ var serialized2 = oneofSerialize(deserialized1);
+ var deserialized2 = oneofDeserialize(serialized2);
+ assert.deepEqual(deserialized1, deserialized2);
+ });
+ it('Should emit a property indicating which field was chosen', function() {
+ var test_message1 = {oneof_choice: 'int_choice', int_choice: 5};
+ var serialized1 = oneofSerialize(test_message1);
+ var deserialized1 = oneofDeserialize(serialized1);
+ assert.equal(deserialized1.oneof_choice, 'int_choice');
+ var test_message2 = {oneof_choice: 'string_choice', string_choice: 'abc'};
+ var serialized2 = oneofSerialize(test_message2);
+ var deserialized2 = oneofDeserialize(serialized2);
+ assert.equal(deserialized2.oneof_choice, 'string_choice');
+ });
+});
+describe('Proto message enum serialize and deserialize', function() {
+ var enumSerialize = serializeCls(messages_proto.EnumValues);
+ var enumDeserialize = deserializeCls(
+ messages_proto.EnumValues, default_options);
+ var enumIntOptions = _.defaults({enumsAsStrings: false}, default_options);
+ var enumIntDeserialize = deserializeCls(
+ messages_proto.EnumValues, enumIntOptions);
+ it('Should accept both names and numbers', function() {
+ var nameSerialized = enumSerialize({enum_value: 'ONE'});
+ var numberSerialized = enumSerialize({enum_value: 1});
+ assert.strictEqual(messages_proto.TestEnum.ONE, 1);
+ assert.deepEqual(enumDeserialize(nameSerialized),
+ enumDeserialize(numberSerialized));
+ });
+ it('Should deserialize as a string the enumsAsStrings option', function() {
+ var serialized = enumSerialize({enum_value: 'TWO'});
+ var nameDeserialized = enumDeserialize(serialized);
+ var numberDeserialized = enumIntDeserialize(serialized);
+ assert.deepEqual(nameDeserialized, {enum_value: 'TWO'});
+ assert.deepEqual(numberDeserialized, {enum_value: 2});
});
});
diff --git a/src/node/test/credentials_test.js b/src/node/test/credentials_test.js
index 305843f665..b66b4bf5ea 100644
--- a/src/node/test/credentials_test.js
+++ b/src/node/test/credentials_test.js
@@ -228,7 +228,7 @@ describe('client credentials', function() {
before(function() {
var proto = grpc.load(__dirname + '/test_service.proto');
server = new grpc.Server();
- server.addProtoService(proto.TestService.service, {
+ server.addService(proto.TestService.service, {
unary: function(call, cb) {
call.sendMetadata(call.metadata);
cb(null, {});
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 1d739562a6..783028fa99 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -34,19 +34,22 @@
'use strict';
var assert = require('assert');
+var _ = require('lodash');
var surface_client = require('../src/client.js');
+var common = require('../src/common');
var ProtoBuf = require('protobufjs');
var grpc = require('..');
-var math_proto = ProtoBuf.loadProtoFile(__dirname +
- '/../../proto/math/math.proto');
+var math_proto = new ProtoBuf.Root();
+math_proto = math_proto.loadSync(__dirname +
+ '/../../proto/math/math.proto', {keepCase: true});
var mathService = math_proto.lookup('math.Math');
-
-var _ = require('lodash');
+var mathServiceAttrs = grpc.loadObject(
+ mathService, common.defaultGrpcOptions).service;
/**
* This is used for testing functions with multiple asynchronous calls that
@@ -87,11 +90,6 @@ describe('File loader', function() {
grpc.load(__dirname + '/test_service.json', 'json');
});
});
- it('Should fail to load a file with an unknown format', function() {
- assert.throws(function() {
- grpc.load(__dirname + '/test_service.proto', 'fake_format');
- });
- });
});
describe('surface Server', function() {
var server;
@@ -132,15 +130,40 @@ describe('Server.prototype.addProtoService', function() {
afterEach(function() {
server.forceShutdown();
});
- it('Should succeed with a single service', function() {
+ it('Should succeed with a single proto service', function() {
assert.doesNotThrow(function() {
server.addProtoService(mathService, dummyImpls);
});
});
+ it('Should succeed with a single service attributes object', function() {
+ assert.doesNotThrow(function() {
+ server.addProtoService(mathServiceAttrs, dummyImpls);
+ });
+ });
+});
+describe('Server.prototype.addService', function() {
+ var server;
+ var dummyImpls = {
+ 'div': function() {},
+ 'divMany': function() {},
+ 'fib': function() {},
+ 'sum': function() {}
+ };
+ beforeEach(function() {
+ server = new grpc.Server();
+ });
+ afterEach(function() {
+ server.forceShutdown();
+ });
+ it('Should succeed with a single service', function() {
+ assert.doesNotThrow(function() {
+ server.addService(mathServiceAttrs, dummyImpls);
+ });
+ });
it('Should fail with conflicting method names', function() {
- server.addProtoService(mathService, dummyImpls);
+ server.addService(mathServiceAttrs, dummyImpls);
assert.throws(function() {
- server.addProtoService(mathService, dummyImpls);
+ server.addService(mathServiceAttrs, dummyImpls);
});
});
it('Should allow method names as originally written', function() {
@@ -172,15 +195,15 @@ describe('Server.prototype.addProtoService', function() {
it('Should fail if the server has been started', function() {
server.start();
assert.throws(function() {
- server.addProtoService(mathService, dummyImpls);
+ server.addService(mathServiceAttrs, dummyImpls);
});
});
describe('Default handlers', function() {
var client;
beforeEach(function() {
- server.addProtoService(mathService, {});
+ server.addService(mathServiceAttrs, {});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(mathService);
+ var Client = grpc.loadObject(mathService);
client = new Client('localhost:' + port,
grpc.credentials.createInsecure());
server.start();
@@ -252,7 +275,7 @@ describe('waitForClientReady', function() {
server = new grpc.Server();
port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure());
server.start();
- Client = surface_client.makeProtobufClientConstructor(mathService);
+ Client = grpc.loadObject(mathService);
});
beforeEach(function() {
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
@@ -309,16 +332,18 @@ describe('Echo service', function() {
var server;
var client;
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/echo_service.proto',
+ {keepCase: true});
var echo_service = test_proto.lookup('EchoService');
+ var Client = grpc.loadObject(echo_service);
server = new grpc.Server();
- server.addProtoService(echo_service, {
+ server.addService(Client.service, {
echo: function(call, callback) {
callback(null, call.request);
}
});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(echo_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
@@ -432,10 +457,13 @@ describe('Echo metadata', function() {
var server;
var metadata;
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/test_service.proto',
+ {keepCase: true});
var test_service = test_proto.lookup('TestService');
+ var Client = grpc.loadObject(test_service);
server = new grpc.Server();
- server.addProtoService(test_service, {
+ server.addService(Client.service, {
unary: function(call, cb) {
call.sendMetadata(call.metadata);
cb(null, {});
@@ -460,7 +488,6 @@ describe('Echo metadata', function() {
}
});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
metadata = new grpc.Metadata();
@@ -533,7 +560,9 @@ describe('Client malformed response handling', function() {
var client;
var badArg = new Buffer([0xFF]);
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/test_service.proto',
+ {keepCase: true});
var test_service = test_proto.lookup('TestService');
var malformed_test_service = {
unary: {
@@ -591,7 +620,7 @@ describe('Client malformed response handling', function() {
}
});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(test_service);
+ var Client = grpc.loadObject(test_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
@@ -640,7 +669,9 @@ describe('Server serialization failure handling', function() {
var client;
var server;
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/test_service.proto',
+ {keepCase: true});
var test_service = test_proto.lookup('TestService');
var malformed_test_service = {
unary: {
@@ -698,7 +729,7 @@ describe('Server serialization failure handling', function() {
}
});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(test_service);
+ var Client = grpc.loadObject(test_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
@@ -747,12 +778,15 @@ describe('Other conditions', function() {
var server;
var port;
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/test_service.proto',
+ {keepCase: true});
test_service = test_proto.lookup('TestService');
+ Client = grpc.loadObject(test_service);
server = new grpc.Server();
var trailer_metadata = new grpc.Metadata();
trailer_metadata.add('trailer-present', 'yes');
- server.addProtoService(test_service, {
+ server.addService(Client.service, {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
@@ -812,7 +846,6 @@ describe('Other conditions', function() {
}
});
port = server.bind('localhost:0', server_insecure_creds);
- Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
@@ -1093,17 +1126,19 @@ describe('Call propagation', function() {
var client;
var server;
before(function() {
- var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+ var test_proto = new ProtoBuf.Root();
+ test_proto = test_proto.loadSync(__dirname + '/test_service.proto',
+ {keepCase: true});
test_service = test_proto.lookup('TestService');
server = new grpc.Server();
- server.addProtoService(test_service, {
+ Client = grpc.loadObject(test_service);
+ server.addService(Client.service, {
unary: function(call) {},
clientStream: function(stream) {},
serverStream: function(stream) {},
bidiStream: function(stream) {}
});
var port = server.bind('localhost:0', server_insecure_creds);
- Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
@@ -1138,7 +1173,7 @@ describe('Call propagation', function() {
});
call.cancel();
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1160,7 +1195,7 @@ describe('Call propagation', function() {
});
call.cancel();
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1180,7 +1215,7 @@ describe('Call propagation', function() {
});
call.cancel();
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1204,7 +1239,7 @@ describe('Call propagation', function() {
});
call.cancel();
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1235,7 +1270,7 @@ describe('Call propagation', function() {
}
});
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1259,7 +1294,7 @@ describe('Call propagation', function() {
done();
});
};
- proxy.addProtoService(test_service, proxy_impl);
+ proxy.addService(Client.service, proxy_impl);
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
@@ -1279,14 +1314,14 @@ describe('Cancelling surface client', function() {
var server;
before(function() {
server = new grpc.Server();
- server.addProtoService(mathService, {
+ server.addService(mathServiceAttrs, {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
});
var port = server.bind('localhost:0', server_insecure_creds);
- var Client = surface_client.makeProtobufClientConstructor(mathService);
+ var Client = surface_client.makeClientConstructor(mathServiceAttrs);
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto
index a1a6a32833..ae70f6e152 100644
--- a/src/node/test/test_messages.proto
+++ b/src/node/test/test_messages.proto
@@ -41,3 +41,20 @@ message SequenceValues {
bytes bytes_field = 1;
repeated int32 repeated_field = 2;
}
+
+message OneOfValues {
+ oneof oneof_choice {
+ int32 int_choice = 1;
+ string string_choice = 2;
+ }
+}
+
+enum TestEnum {
+ ZERO = 0;
+ ONE = 1;
+ TWO = 2;
+}
+
+message EnumValues {
+ TestEnum enum_value = 1;
+} \ No newline at end of file
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 47838c2c98..f29c44a4cf 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -705,6 +705,10 @@ def _serve(state):
state.rpc_states.remove(rpc_state)
if _stop_serving(state):
return
+ # We want to force the deletion of the previous event
+ # ~before~ we poll again; if the event has a reference
+ # to a shutdown Call object, this can induce spinlock.
+ event = None
def _stop(state, grace):
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb
index 43e2fe8cbb..d3e5373b0b 100755
--- a/src/ruby/end2end/channel_closing_driver.rb
+++ b/src/ruby/end2end/channel_closing_driver.rb
@@ -36,7 +36,7 @@ require_relative './end2end_common'
def main
STDERR.puts 'start server'
- server_runner = ServerRunner.new
+ server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index c3184bf939..80fb62899e 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -35,7 +35,7 @@ require_relative './end2end_common'
def main
STDERR.puts 'start server'
- server_runner = ServerRunner.new
+ server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb
index 9534bb2078..1c87ceddf1 100755
--- a/src/ruby/end2end/end2end_common.rb
+++ b/src/ruby/end2end/end2end_common.rb
@@ -55,13 +55,14 @@ end
# ServerRunner starts an "echo server" that test clients can make calls to
class ServerRunner
- def initialize
+ def initialize(service_impl)
+ @service_impl = service_impl
end
def run
@srv = GRPC::RpcServer.new
port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
- @srv.handle(EchoServerImpl)
+ @srv.handle(@service_impl)
@thd = Thread.new do
@srv.run
diff --git a/src/ruby/end2end/killed_client_thread_client.rb b/src/ruby/end2end/killed_client_thread_client.rb
new file mode 100755
index 0000000000..d5a7db7d58
--- /dev/null
+++ b/src/ruby/end2end/killed_client_thread_client.rb
@@ -0,0 +1,58 @@
+#!/usr/bin/env ruby
+
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# Attempt to reproduce
+# https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/1327
+
+require_relative './end2end_common'
+
+def main
+ server_port = ''
+ OptionParser.new do |opts|
+ opts.on('--client_control_port=P', String) do
+ STDERR.puts 'client control port not used'
+ end
+ opts.on('--server_port=P', String) do |p|
+ server_port = p
+ end
+ end.parse!
+
+ thd = Thread.new do
+ stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",
+ :this_channel_is_insecure)
+ stub.echo(Echo::EchoRequest.new(request: 'hello'))
+ fail 'the clients rpc in this test shouldnt complete. ' \
+ 'expecting SIGINT to happen in the middle of the call'
+ end
+ thd.join
+end
+
+main
diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb
new file mode 100755
index 0000000000..f76d3e1746
--- /dev/null
+++ b/src/ruby/end2end/killed_client_thread_driver.rb
@@ -0,0 +1,114 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require_relative './end2end_common'
+
+# Service that sleeps for a long time upon receiving an 'echo request'
+# Also, this notifies @call_started_cv once it has received a request.
+class SleepingEchoServerImpl < Echo::EchoServer::Service
+ def initialize(call_started, call_started_mu, call_started_cv)
+ @call_started = call_started
+ @call_started_mu = call_started_mu
+ @call_started_cv = call_started_cv
+ end
+
+ def echo(echo_req, _)
+ @call_started_mu.synchronize do
+ @call_started.set_true
+ @call_started_cv.signal
+ end
+ sleep 1000
+ Echo::EchoReply.new(response: echo_req.request)
+ end
+end
+
+# Mutable boolean
+class BoolHolder
+ attr_reader :val
+
+ def init
+ @val = false
+ end
+
+ def set_true
+ @val = true
+ end
+end
+
+def main
+ STDERR.puts 'start server'
+
+ call_started = BoolHolder.new
+ call_started_mu = Mutex.new
+ call_started_cv = ConditionVariable.new
+
+ service_impl = SleepingEchoServerImpl.new(call_started,
+ call_started_mu,
+ call_started_cv)
+ server_runner = ServerRunner.new(service_impl)
+ server_port = server_runner.run
+
+ STDERR.puts 'start client'
+ _, client_pid = start_client('killed_client_thread_client.rb',
+ server_port)
+
+ call_started_mu.synchronize do
+ call_started_cv.wait(call_started_mu) until call_started.val
+ end
+
+ # SIGINT the child process now that it's
+ # in the middle of an RPC (happening on a non-main thread)
+ Process.kill('SIGINT', client_pid)
+ STDERR.puts 'sent shutdown'
+
+ begin
+ Timeout.timeout(10) do
+ Process.wait(client_pid)
+ end
+ rescue Timeout::Error
+ STDERR.puts "timeout wait for client pid #{client_pid}"
+ Process.kill('SIGKILL', client_pid)
+ Process.wait(client_pid)
+ STDERR.puts 'killed client child'
+ raise 'Timed out waiting for client process. ' \
+ 'It likely hangs when killed while in the middle of an rpc'
+ end
+
+ client_exit_code = $CHILD_STATUS
+ if client_exit_code.termsig != 2 # SIGINT
+ fail 'expected client exit from SIGINT ' \
+ "but got child status: #{client_exit_code}"
+ end
+
+ server_runner.stop
+end
+
+main
diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb
index c5d46e074c..6691464dc6 100755
--- a/src/ruby/end2end/sig_handling_driver.rb
+++ b/src/ruby/end2end/sig_handling_driver.rb
@@ -36,7 +36,7 @@ require_relative './end2end_common'
def main
STDERR.puts 'start server'
- server_runner = ServerRunner.new
+ server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
index 84d039bf19..670cda0919 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
@@ -36,7 +36,7 @@ require_relative './end2end_common'
def main
STDERR.puts 'start server'
- server_runner = ServerRunner.new
+ server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 82d340b254..344cb941ff 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -784,7 +784,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
Only one operation of each type can be active at once in any given
batch */
static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
- run_batch_stack st;
+ run_batch_stack *st = NULL;
grpc_rb_call *call = NULL;
grpc_event ev;
grpc_call_error err;
@@ -792,6 +792,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
unsigned write_flag = 0;
void *tag = (void*)&st;
+
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
return Qnil;
@@ -806,14 +807,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
if (rb_write_flag != Qnil) {
write_flag = NUM2UINT(rb_write_flag);
}
- grpc_run_batch_stack_init(&st, write_flag);
- grpc_run_batch_stack_fill_ops(&st, ops_hash);
+ st = gpr_malloc(sizeof(run_batch_stack));
+ grpc_run_batch_stack_init(st, write_flag);
+ grpc_run_batch_stack_fill_ops(st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
* pluck_event */
- err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL);
+ err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
if (err != GRPC_CALL_OK) {
- grpc_run_batch_stack_cleanup(&st);
+ grpc_run_batch_stack_cleanup(st);
+ gpr_free(st);
rb_raise(grpc_rb_eCallError,
"grpc_call_start_batch failed with %s (code=%d)",
grpc_call_error_detail_of(err), err);
@@ -826,8 +829,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
}
/* Build and return the BatchResult struct result,
if there is an error, it's reflected in the status */
- result = grpc_run_batch_stack_build_result(&st);
- grpc_run_batch_stack_cleanup(&st);
+ result = grpc_run_batch_stack_build_result(st);
+ grpc_run_batch_stack_cleanup(st);
+ gpr_free(st);
return result;
}
diff --git a/templates/grpc.gemspec.template b/templates/grpc.gemspec.template
index 462ea52614..80ce643d80 100644
--- a/templates/grpc.gemspec.template
+++ b/templates/grpc.gemspec.template
@@ -26,7 +26,7 @@
s.files += Dir.glob('include/grpc/**/*')
s.test_files = Dir.glob('src/ruby/spec/**/*')
s.bindir = 'src/ruby/bin'
- s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb )
+ s.require_paths = %w( src/ruby/lib src/ruby/bin src/ruby/pb )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.1'
diff --git a/templates/package.json.template b/templates/package.json.template
index d093883cf5..b69fd28d2a 100644
--- a/templates/package.json.template
+++ b/templates/package.json.template
@@ -36,7 +36,7 @@
"lodash": "^4.15.0",
"nan": "^2.0.0",
"node-pre-gyp": "^0.6.0",
- "protobufjs": "^5.0.0",
+ "protobufjs": "^6.7.0",
"cares": "^1.1.5"
},
"devDependencies": {
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 7352c6894b..4389fa9f62 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -118,10 +118,7 @@ void grpc_run_bad_client_test(
grpc_init();
/* Create endpoints */
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("bad_client_test");
- sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
/* Create server, completion events */
a.server = grpc_server_create(NULL, NULL);
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c
index c4f7672a9c..50ad025796 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.c
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.c
@@ -97,9 +97,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.cq = grpc_completion_queue_create_for_next(NULL);
f.shutdown_cq = grpc_completion_queue_create_for_pluck(NULL);
- grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
- grpc_resource_quota_unref(resource_quota);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
return f;
}
diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c
index 3d9ffc2ab8..72a997acd9 100644
--- a/test/core/end2end/fixtures/h2_sockpair.c
+++ b/test/core/end2end/fixtures/h2_sockpair.c
@@ -91,9 +91,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.cq = grpc_completion_queue_create_for_next(NULL);
f.shutdown_cq = grpc_completion_queue_create_for_pluck(NULL);
- grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
- grpc_resource_quota_unref(resource_quota);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
return f;
}
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c
index 13e7f94734..7dc6beab3b 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.c
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c
@@ -91,9 +91,17 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.cq = grpc_completion_queue_create_for_next(NULL);
f.shutdown_cq = grpc_completion_queue_create_for_pluck(NULL);
- grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 1);
- grpc_resource_quota_unref(resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = 1},
+ {.key = GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = 1},
+ {.key = GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = 1}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", &args);
return f;
}
diff --git a/test/core/end2end/fuzzers/server_fuzzer_corpus/clusterfuzz-testcase-6312731374256128 b/test/core/end2end/fuzzers/server_fuzzer_corpus/clusterfuzz-testcase-6312731374256128
new file mode 100644
index 0000000000..4c6eb601ae
--- /dev/null
+++ b/test/core/end2end/fuzzers/server_fuzzer_corpus/clusterfuzz-testcase-6312731374256128
Binary files differ
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 4b98ef257e..c8a60776b9 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -49,11 +49,11 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_test_fixture f;
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("endpoint_pair_test");
- grpc_endpoint_pair p =
- grpc_iomgr_create_endpoint_pair("test", resource_quota, slice_size);
- grpc_resource_quota_unref(resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", &args);
f.client_ep = p.client;
f.server_ep = p.server;
diff --git a/test/core/iomgr/error_test.c b/test/core/iomgr/error_test.c
index 5c60a4ddb8..607dbeea3e 100644
--- a/test/core/iomgr/error_test.c
+++ b/test/core/iomgr/error_test.c
@@ -182,8 +182,6 @@ static void print_error_string_reference() {
grpc_error* parent =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Parent", children, 2);
- gpr_log(GPR_DEBUG, "%s", grpc_error_string(parent));
-
for (size_t i = 0; i < 2; ++i) {
GRPC_ERROR_UNREF(children[i]);
}
@@ -216,6 +214,33 @@ static void test_special() {
GRPC_ERROR_UNREF(error);
}
+static void test_overflow() {
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Overflow");
+
+ for (size_t i = 0; i < 150; ++i) {
+ error = grpc_error_add_child(error,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Child"));
+ }
+
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_HTTP2_ERROR, 5);
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ grpc_slice_from_static_string("message for child 2"));
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, 5);
+
+ intptr_t i;
+ GPR_ASSERT(grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &i));
+ GPR_ASSERT(i == 5);
+ GPR_ASSERT(!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &i));
+
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_HTTP2_ERROR, 10);
+ GPR_ASSERT(grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &i));
+ GPR_ASSERT(i == 10);
+
+ GRPC_ERROR_UNREF(error);
+ ;
+}
+
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
@@ -228,6 +253,7 @@ int main(int argc, char** argv) {
test_create_referencing();
test_create_referencing_many();
test_special();
+ test_overflow();
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
index 3dffa02c3c..6ac322bb01 100644
--- a/test/core/iomgr/fd_conservation_posix_test.c
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -57,7 +57,7 @@ int main(int argc, char **argv) {
for (i = 0; i < 100; i++) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- p = grpc_iomgr_create_endpoint_pair("test", resource_quota, 1);
+ p = grpc_iomgr_create_endpoint_pair("test", NULL);
grpc_endpoint_destroy(&exec_ctx, p.client);
grpc_endpoint_destroy(&exec_ctx, p.server);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 5a55be888f..2c53a003d2 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -183,10 +183,12 @@ static void read_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- grpc_resource_quota *resource_quota = grpc_resource_quota_create("read_test");
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
- slice_size, "test");
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -233,11 +235,12 @@ static void large_read_test(size_t slice_size) {
create_sockets(sv);
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("large_read_test");
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), resource_quota,
- slice_size, "test");
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"),
+ &args, "test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@@ -372,11 +375,12 @@ static void write_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("write_test");
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), resource_quota,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args,
+ "test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
state.ep = ep;
@@ -441,12 +445,13 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("release_fd_test");
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
- slice_size, "test");
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -534,10 +539,14 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
create_sockets(sv);
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("tcp_posix_test_socketpair");
- f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
- resource_quota, slice_size, "test");
- f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
- resource_quota, slice_size, "test");
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ f.client_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test");
+ f.server_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test");
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index 8f11f98a9c..71d8057ac3 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -39,6 +39,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
@@ -57,10 +58,11 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
- grpc_resource_quota *resource_quota =
- grpc_resource_quota_create("secure_endpoint_test");
- tcp = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, slice_size);
- grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
+ .type = GRPC_ARG_INTEGER,
+ .value.integer = (int)slice_size}};
+ grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
+ tcp = grpc_iomgr_create_endpoint_pair("fixture", &args);
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, g_pollset);
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset);
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index 38619666dc..cae3fa1a14 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -32,16 +32,25 @@ licenses(["notice"]) # 3-clause BSD
cc_test(
name = "noop-benchmark",
srcs = ["noop-benchmark.cc"],
- deps = ["//external:benchmark"],
linkopts = ["-pthread"],
+ deps = ["//external:benchmark"],
)
cc_library(
name = "helpers",
srcs = ["helpers.cc"],
- hdrs = ["helpers.h", "fullstack_fixtures.h", "fullstack_context_mutators.h"],
- deps = ["//:grpc++", "//external:benchmark", "//test/core/util:grpc_test_util", "//src/proto/grpc/testing:echo_proto"],
+ hdrs = [
+ "fullstack_context_mutators.h",
+ "fullstack_fixtures.h",
+ "helpers.h",
+ ],
linkopts = ["-pthread"],
+ deps = [
+ "//:grpc++",
+ "//external:benchmark",
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_test(
@@ -57,6 +66,12 @@ cc_test(
)
cc_test(
+ name = "bm_cq_multiple_threads",
+ srcs = ["bm_cq_multiple_threads.cc"],
+ deps = [":helpers"],
+)
+
+cc_test(
name = "bm_error",
srcs = ["bm_error.cc"],
deps = [":helpers"],
@@ -66,8 +81,8 @@ cc_test(
name = "bm_fullstack_streaming_ping_pong",
srcs = ["bm_fullstack_streaming_ping_pong.cc"],
deps = [":helpers"],
+)
- )
cc_test(
name = "bm_fullstack_streaming_pump",
srcs = ["bm_fullstack_streaming_pump.cc"],
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
new file mode 100644
index 0000000000..967c226ac7
--- /dev/null
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <benchmark/benchmark.h>
+#include <string.h>
+#include <atomic>
+
+#include <grpc/grpc.h>
+#include "test/cpp/microbenchmarks/helpers.h"
+
+extern "C" {
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/port.h"
+#include "src/core/lib/surface/completion_queue.h"
+}
+
+struct grpc_pollset {
+ gpr_mu mu;
+};
+
+namespace grpc {
+namespace testing {
+
+static void* make_tag(int i) { return (void*)(intptr_t)i; }
+static grpc_completion_queue* g_cq;
+static grpc_event_engine_vtable g_vtable;
+
+static __thread int g_thread_idx;
+static __thread grpc_cq_completion g_cq_completion;
+
+static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
+ grpc_closure* closure) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+}
+
+static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
+ gpr_mu_init(&ps->mu);
+ *mu = &ps->mu;
+}
+
+static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
+
+static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
+ return GRPC_ERROR_NONE;
+}
+
+/* Callback when the tag is dequeued from the completion queue. Does nothing */
+static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* cq_completion) {}
+
+/* Queues a completion tag. ZERO polling overhead */
+static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
+ grpc_pollset_worker** worker, gpr_timespec now,
+ gpr_timespec deadline) {
+ gpr_mu_unlock(&ps->mu);
+ grpc_cq_end_op(exec_ctx, g_cq, make_tag(g_thread_idx), GRPC_ERROR_NONE,
+ cq_done_cb, NULL, &g_cq_completion);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&ps->mu);
+ return GRPC_ERROR_NONE;
+}
+
+static void init_engine_vtable() {
+ memset(&g_vtable, 0, sizeof(g_vtable));
+
+ g_vtable.pollset_size = sizeof(grpc_pollset);
+ g_vtable.pollset_init = pollset_init;
+ g_vtable.pollset_shutdown = pollset_shutdown;
+ g_vtable.pollset_destroy = pollset_destroy;
+ g_vtable.pollset_work = pollset_work;
+ g_vtable.pollset_kick = pollset_kick;
+}
+
+static void setup() {
+ grpc_init();
+ init_engine_vtable();
+ grpc_set_event_engine_test_only(&g_vtable);
+
+ g_cq = grpc_completion_queue_create(NULL);
+}
+
+static void BM_Cq_Throughput(benchmark::State& state) {
+ TrackCounters track_counters;
+ gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+
+ if (state.thread_index == 0) {
+ setup();
+ }
+
+ while (state.KeepRunning()) {
+ g_thread_idx = state.thread_index;
+ void* dummy_tag = make_tag(g_thread_idx);
+ grpc_cq_begin_op(g_cq, dummy_tag);
+ grpc_completion_queue_next(g_cq, deadline, NULL);
+ }
+
+ state.SetItemsProcessed(state.iterations());
+
+ if (state.thread_index == 0) {
+ grpc_completion_queue_shutdown(g_cq);
+ grpc_completion_queue_destroy(g_cq);
+ }
+
+ track_counters.Finish(state);
+}
+
+BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
+
+} // namespace testing
+} // namespace grpc
+
+BENCHMARK_MAIN();
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index dc29701059..acc56bf39b 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -212,8 +212,8 @@ class EndpointPairFixture : public BaseFixture {
class SockPair : public EndpointPairFixture {
public:
SockPair(Service* service)
- : EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair(
- "test", Library::get().rq(), 8192)) {}
+ : EndpointPairFixture(service,
+ grpc_iomgr_create_endpoint_pair("test", NULL)) {}
};
class InProcessCHTTP2 : public EndpointPairFixture {
diff --git a/test/distrib/csharp/run_distrib_test.bat b/test/distrib/csharp/run_distrib_test.bat
index 6cf381142f..cb5dd55273 100644
--- a/test/distrib/csharp/run_distrib_test.bat
+++ b/test/distrib/csharp/run_distrib_test.bat
@@ -31,7 +31,7 @@
cd /d %~dp0
@rem extract input artifacts
-powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('../../../input_artifacts/csharp_nugets_dotnetcli.zip', 'TestNugetFeed');"
+powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('../../../input_artifacts/csharp_nugets_windows_dotnetcli.zip', 'TestNugetFeed');"
update_version.sh auto
diff --git a/test/distrib/csharp/run_distrib_test.sh b/test/distrib/csharp/run_distrib_test.sh
index 0a77c1af44..9de5ce0cd3 100755
--- a/test/distrib/csharp/run_distrib_test.sh
+++ b/test/distrib/csharp/run_distrib_test.sh
@@ -32,7 +32,7 @@ set -ex
cd $(dirname $0)
-unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets_dotnetcli.zip" -d TestNugetFeed
+unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets_windows_dotnetcli.zip" -d TestNugetFeed
./update_version.sh auto
diff --git a/test/distrib/csharp/run_distrib_test_dotnetcli.sh b/test/distrib/csharp/run_distrib_test_dotnetcli.sh
index 493c5049fb..cdfc91bf42 100755
--- a/test/distrib/csharp/run_distrib_test_dotnetcli.sh
+++ b/test/distrib/csharp/run_distrib_test_dotnetcli.sh
@@ -32,7 +32,7 @@ set -ex
cd $(dirname $0)
-unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets_dotnetcli.zip" -d TestNugetFeed
+unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets_windows_dotnetcli.zip" -d TestNugetFeed
./update_version.sh auto
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index ae5120d87c..11176d9756 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2566,6 +2566,27 @@
"headers": [],
"is_filegroup": false,
"language": "c++",
+ "name": "bm_cq_multiple_threads",
+ "src": [
+ "test/cpp/microbenchmarks/bm_cq_multiple_threads.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "benchmark",
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
+ "grpc++_test_util",
+ "grpc_benchmark",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
"name": "bm_error",
"src": [
"test/cpp/microbenchmarks/bm_error.cc"
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 88bac795e4..12d48f219d 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2752,6 +2752,28 @@
"flaky": false,
"gtest": false,
"language": "c++",
+ "name": "bm_cq_multiple_threads",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
+ "--benchmark_min_time=0"
+ ],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c++",
"name": "bm_error",
"platforms": [
"linux",
@@ -150767,6 +150789,29 @@
},
{
"args": [
+ "test/core/end2end/fuzzers/server_fuzzer_corpus/clusterfuzz-testcase-6312731374256128"
+ ],
+ "ci_platforms": [
+ "linux"
+ ],
+ "cpu_cost": 0.1,
+ "exclude_configs": [
+ "tsan"
+ ],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "server_fuzzer_one_entry",
+ "platforms": [
+ "mac",
+ "linux"
+ ],
+ "uses_polling": false
+ },
+ {
+ "args": [
"test/core/end2end/fuzzers/server_fuzzer_corpus/crash-0f4b135c0242669ce425d2662168e9440f8a628d"
],
"ci_platforms": [
diff --git a/tools/run_tests/helper_scripts/pre_build_csharp.bat b/tools/run_tests/helper_scripts/pre_build_csharp.bat
index 99df1c6626..bee430ac86 100644
--- a/tools/run_tests/helper_scripts/pre_build_csharp.bat
+++ b/tools/run_tests/helper_scripts/pre_build_csharp.bat
@@ -43,7 +43,7 @@ cd build
mkdir %ARCHITECTURE%
cd %ARCHITECTURE%
@rem TODO(jtattermusch): Stop hardcoding path to yasm once Jenkins workers can locate yasm correctly
-cmake -G "Visual Studio 14 2015" -A %ARCHITECTURE% -DgRPC_BUILD_TESTS=OFF -DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" ../../.. || goto :error
+cmake -G "Visual Studio 14 2015" -A %ARCHITECTURE% -DgRPC_BUILD_TESTS=OFF -DgRPC_MSVC_STATIC_RUNTIME=ON -DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" ../../.. || goto :error
cd ..\..\..
@rem Location of nuget.exe
diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
index d7da6364d8..92d6975707 100755
--- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
+++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
@@ -38,4 +38,5 @@ ruby src/ruby/end2end/sig_handling_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/channel_state_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/channel_closing_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/sig_int_during_channel_watch_driver.rb || EXIT_CODE=1
+ruby src/ruby/end2end/killed_client_thread_driver.rb || EXIT_CODE=1
exit $EXIT_CODE