aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD12
-rw-r--r--CMakeLists.txt4
-rw-r--r--Makefile76
-rw-r--r--binding.gyp2
-rw-r--r--build.yaml24
-rw-r--r--config.m42
-rw-r--r--gRPC-Core.podspec6
-rw-r--r--gRPC.podspec6
-rwxr-xr-xgrpc.gemspec4
-rw-r--r--include/grpc/impl/codegen/atm.h3
-rw-r--r--include/grpc/impl/codegen/atm_gcc_atomic.h2
-rw-r--r--include/grpc/impl/codegen/atm_gcc_sync.h7
-rw-r--r--include/grpc/impl/codegen/atm_windows.h8
-rw-r--r--package.xml4
-rw-r--r--src/core/ext/client_config/subchannel.c18
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c8
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c647
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h52
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c16
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c9
-rw-r--r--src/core/lib/channel/channel_stack.c23
-rw-r--r--src/core/lib/channel/compress_filter.c14
-rw-r--r--src/core/lib/iomgr/closure.c4
-rw-r--r--src/core/lib/iomgr/closure.h18
-rw-r--r--src/core/lib/iomgr/combiner.c221
-rw-r--r--src/core/lib/iomgr/combiner.h67
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/iomgr/workqueue.h2
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c87
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h9
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c4
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c27
-rw-r--r--src/core/lib/support/mpscq.c80
-rw-r--r--src/core/lib/support/mpscq.h65
-rw-r--r--src/core/lib/surface/call.c63
-rw-r--r--src/core/lib/surface/channel.c7
-rw-r--r--src/core/lib/surface/channel_ping.c9
-rw-r--r--src/core/lib/surface/server.c55
-rw-r--r--src/core/lib/transport/transport.c27
-rw-r--r--src/core/lib/transport/transport.h21
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--test/core/iomgr/combiner_test.c163
-rw-r--r--test/core/support/mpscq_test.c124
-rw-r--r--test/core/surface/lame_client_test.c15
m---------third_party/protobuf0
-rw-r--r--tools/doxygen/Doxyfile.core.internal4
-rw-r--r--tools/run_tests/sources_and_headers.json36
-rw-r--r--tools/run_tests/tests.json42
-rw-r--r--vsprojects/buildtests_c.sln52
-rw-r--r--vsprojects/vcxproj/gpr/gpr.vcxproj3
-rw-r--r--vsprojects/vcxproj/gpr/gpr.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj199
-rw-r--r--vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj.filters21
-rw-r--r--vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj193
-rw-r--r--vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj.filters21
61 files changed, 2006 insertions, 616 deletions
diff --git a/BUILD b/BUILD
index 33323be229..ff7b73c74f 100644
--- a/BUILD
+++ b/BUILD
@@ -49,6 +49,7 @@ cc_library(
"src/core/lib/support/backoff.h",
"src/core/lib/support/block_annotate.h",
"src/core/lib/support/env.h",
+ "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.h",
@@ -76,6 +77,7 @@ cc_library(
"src/core/lib/support/log_linux.c",
"src/core/lib/support/log_posix.c",
"src/core/lib/support/log_windows.c",
+ "src/core/lib/support/mpscq.c",
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/slice.c",
"src/core/lib/support/slice_buffer.c",
@@ -174,6 +176,7 @@ cc_library(
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -325,6 +328,7 @@ cc_library(
"src/core/lib/http/httpcli.c",
"src/core/lib/http/parser.c",
"src/core/lib/iomgr/closure.c",
+ "src/core/lib/iomgr/combiner.c",
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
@@ -561,6 +565,7 @@ cc_library(
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -702,6 +707,7 @@ cc_library(
"src/core/lib/http/httpcli.c",
"src/core/lib/http/parser.c",
"src/core/lib/iomgr/closure.c",
+ "src/core/lib/iomgr/combiner.c",
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
@@ -913,6 +919,7 @@ cc_library(
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -1041,6 +1048,7 @@ cc_library(
"src/core/lib/http/httpcli.c",
"src/core/lib/http/parser.c",
"src/core/lib/iomgr/closure.c",
+ "src/core/lib/iomgr/combiner.c",
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
@@ -1691,6 +1699,7 @@ objc_library(
"src/core/lib/support/log_linux.c",
"src/core/lib/support/log_posix.c",
"src/core/lib/support/log_windows.c",
+ "src/core/lib/support/mpscq.c",
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/slice.c",
"src/core/lib/support/slice_buffer.c",
@@ -1764,6 +1773,7 @@ objc_library(
"src/core/lib/support/backoff.h",
"src/core/lib/support/block_annotate.h",
"src/core/lib/support/env.h",
+ "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.h",
@@ -1800,6 +1810,7 @@ objc_library(
"src/core/lib/http/httpcli.c",
"src/core/lib/http/parser.c",
"src/core/lib/iomgr/closure.c",
+ "src/core/lib/iomgr/combiner.c",
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
@@ -2015,6 +2026,7 @@ objc_library(
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2c0059cd2d..6451f7c5fe 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -94,6 +94,7 @@ add_library(gpr
src/core/lib/support/log_linux.c
src/core/lib/support/log_posix.c
src/core/lib/support/log_windows.c
+ src/core/lib/support/mpscq.c
src/core/lib/support/murmur_hash.c
src/core/lib/support/slice.c
src/core/lib/support/slice_buffer.c
@@ -148,6 +149,7 @@ add_library(grpc
src/core/lib/http/httpcli.c
src/core/lib/http/parser.c
src/core/lib/iomgr/closure.c
+ src/core/lib/iomgr/combiner.c
src/core/lib/iomgr/endpoint.c
src/core/lib/iomgr/endpoint_pair_posix.c
src/core/lib/iomgr/endpoint_pair_windows.c
@@ -353,6 +355,7 @@ add_library(grpc_cronet
src/core/lib/http/httpcli.c
src/core/lib/http/parser.c
src/core/lib/iomgr/closure.c
+ src/core/lib/iomgr/combiner.c
src/core/lib/iomgr/endpoint.c
src/core/lib/iomgr/endpoint_pair_posix.c
src/core/lib/iomgr/endpoint_pair_windows.c
@@ -535,6 +538,7 @@ add_library(grpc_unsecure
src/core/lib/http/httpcli.c
src/core/lib/http/parser.c
src/core/lib/iomgr/closure.c
+ src/core/lib/iomgr/combiner.c
src/core/lib/iomgr/endpoint.c
src/core/lib/iomgr/endpoint_pair_posix.c
src/core/lib/iomgr/endpoint_pair_windows.c
diff --git a/Makefile b/Makefile
index 4ce22678d7..e554248173 100644
--- a/Makefile
+++ b/Makefile
@@ -889,6 +889,7 @@ chttp2_status_conversion_test: $(BINDIR)/$(CONFIG)/chttp2_status_conversion_test
chttp2_stream_map_test: $(BINDIR)/$(CONFIG)/chttp2_stream_map_test
chttp2_varint_test: $(BINDIR)/$(CONFIG)/chttp2_varint_test
client_fuzzer: $(BINDIR)/$(CONFIG)/client_fuzzer
+combiner_test: $(BINDIR)/$(CONFIG)/combiner_test
compression_test: $(BINDIR)/$(CONFIG)/compression_test
concurrent_connectivity_test: $(BINDIR)/$(CONFIG)/concurrent_connectivity_test
dns_resolver_connectivity_test: $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test
@@ -913,6 +914,7 @@ gpr_env_test: $(BINDIR)/$(CONFIG)/gpr_env_test
gpr_histogram_test: $(BINDIR)/$(CONFIG)/gpr_histogram_test
gpr_host_port_test: $(BINDIR)/$(CONFIG)/gpr_host_port_test
gpr_log_test: $(BINDIR)/$(CONFIG)/gpr_log_test
+gpr_mpscq_test: $(BINDIR)/$(CONFIG)/gpr_mpscq_test
gpr_slice_buffer_test: $(BINDIR)/$(CONFIG)/gpr_slice_buffer_test
gpr_slice_test: $(BINDIR)/$(CONFIG)/gpr_slice_test
gpr_stack_lockfree_test: $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test
@@ -1206,6 +1208,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/chttp2_status_conversion_test \
$(BINDIR)/$(CONFIG)/chttp2_stream_map_test \
$(BINDIR)/$(CONFIG)/chttp2_varint_test \
+ $(BINDIR)/$(CONFIG)/combiner_test \
$(BINDIR)/$(CONFIG)/compression_test \
$(BINDIR)/$(CONFIG)/concurrent_connectivity_test \
$(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test \
@@ -1228,6 +1231,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/gpr_histogram_test \
$(BINDIR)/$(CONFIG)/gpr_host_port_test \
$(BINDIR)/$(CONFIG)/gpr_log_test \
+ $(BINDIR)/$(CONFIG)/gpr_mpscq_test \
$(BINDIR)/$(CONFIG)/gpr_slice_buffer_test \
$(BINDIR)/$(CONFIG)/gpr_slice_test \
$(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test \
@@ -1514,6 +1518,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/chttp2_stream_map_test || ( echo test chttp2_stream_map_test failed ; exit 1 )
$(E) "[RUN] Testing chttp2_varint_test"
$(Q) $(BINDIR)/$(CONFIG)/chttp2_varint_test || ( echo test chttp2_varint_test failed ; exit 1 )
+ $(E) "[RUN] Testing combiner_test"
+ $(Q) $(BINDIR)/$(CONFIG)/combiner_test || ( echo test combiner_test failed ; exit 1 )
$(E) "[RUN] Testing compression_test"
$(Q) $(BINDIR)/$(CONFIG)/compression_test || ( echo test compression_test failed ; exit 1 )
$(E) "[RUN] Testing concurrent_connectivity_test"
@@ -1554,6 +1560,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/gpr_host_port_test || ( echo test gpr_host_port_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_log_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_log_test || ( echo test gpr_log_test failed ; exit 1 )
+ $(E) "[RUN] Testing gpr_mpscq_test"
+ $(Q) $(BINDIR)/$(CONFIG)/gpr_mpscq_test || ( echo test gpr_mpscq_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_slice_buffer_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_slice_buffer_test || ( echo test gpr_slice_buffer_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_slice_test"
@@ -2336,6 +2344,7 @@ LIBGPR_SRC = \
src/core/lib/support/log_linux.c \
src/core/lib/support/log_posix.c \
src/core/lib/support/log_windows.c \
+ src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
@@ -2484,6 +2493,7 @@ LIBGRPC_SRC = \
src/core/lib/http/httpcli.c \
src/core/lib/http/parser.c \
src/core/lib/iomgr/closure.c \
+ src/core/lib/iomgr/combiner.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
@@ -2756,6 +2766,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/http/httpcli.c \
src/core/lib/http/parser.c \
src/core/lib/iomgr/closure.c \
+ src/core/lib/iomgr/combiner.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
@@ -3097,6 +3108,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/http/httpcli.c \
src/core/lib/http/parser.c \
src/core/lib/iomgr/closure.c \
+ src/core/lib/iomgr/combiner.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
@@ -6907,6 +6919,38 @@ endif
endif
+COMBINER_TEST_SRC = \
+ test/core/iomgr/combiner_test.c \
+
+COMBINER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(COMBINER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/combiner_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/combiner_test: $(COMBINER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(COMBINER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/combiner_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/iomgr/combiner_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_combiner_test: $(COMBINER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(COMBINER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
COMPRESSION_TEST_SRC = \
test/core/compression/compression_test.c \
@@ -7675,6 +7719,38 @@ endif
endif
+GPR_MPSCQ_TEST_SRC = \
+ test/core/support/mpscq_test.c \
+
+GPR_MPSCQ_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GPR_MPSCQ_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/gpr_mpscq_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/gpr_mpscq_test: $(GPR_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(GPR_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/gpr_mpscq_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/support/mpscq_test.o: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_gpr_mpscq_test: $(GPR_MPSCQ_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GPR_MPSCQ_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
GPR_SLICE_BUFFER_TEST_SRC = \
test/core/support/slice_buffer_test.c \
diff --git a/binding.gyp b/binding.gyp
index e1130ad01a..86e819a480 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -515,6 +515,7 @@
'src/core/lib/support/log_linux.c',
'src/core/lib/support/log_posix.c',
'src/core/lib/support/log_windows.c',
+ 'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
@@ -577,6 +578,7 @@
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
'src/core/lib/iomgr/closure.c',
+ 'src/core/lib/iomgr/combiner.c',
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
diff --git a/build.yaml b/build.yaml
index 57545839d4..fe6946f0cf 100644
--- a/build.yaml
+++ b/build.yaml
@@ -70,6 +70,7 @@ filegroups:
- src/core/lib/support/backoff.h
- src/core/lib/support/block_annotate.h
- src/core/lib/support/env.h
+ - src/core/lib/support/mpscq.h
- src/core/lib/support/murmur_hash.h
- src/core/lib/support/stack_lockfree.h
- src/core/lib/support/string.h
@@ -98,6 +99,7 @@ filegroups:
- src/core/lib/support/log_linux.c
- src/core/lib/support/log_posix.c
- src/core/lib/support/log_windows.c
+ - src/core/lib/support/mpscq.c
- src/core/lib/support/murmur_hash.c
- src/core/lib/support/slice.c
- src/core/lib/support/slice_buffer.c
@@ -165,6 +167,7 @@ filegroups:
- src/core/lib/http/httpcli.h
- src/core/lib/http/parser.h
- src/core/lib/iomgr/closure.h
+ - src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_pair.h
- src/core/lib/iomgr/error.h
@@ -243,6 +246,7 @@ filegroups:
- src/core/lib/http/httpcli.c
- src/core/lib/http/parser.c
- src/core/lib/iomgr/closure.c
+ - src/core/lib/iomgr/combiner.c
- src/core/lib/iomgr/endpoint.c
- src/core/lib/iomgr/endpoint_pair_posix.c
- src/core/lib/iomgr/endpoint_pair_windows.c
@@ -1343,6 +1347,17 @@ targets:
- test/core/end2end/fuzzers/client_fuzzer_corpus
dict: test/core/end2end/fuzzers/hpack.dictionary
maxlen: 2048
+- name: combiner_test
+ cpu_cost: 30
+ build: test
+ language: c
+ src:
+ - test/core/iomgr/combiner_test.c
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
- name: compression_test
build: test
language: c
@@ -1594,6 +1609,15 @@ targets:
deps:
- gpr_test_util
- gpr
+- name: gpr_mpscq_test
+ cpu_cost: 30
+ build: test
+ language: c
+ src:
+ - test/core/support/mpscq_test.c
+ deps:
+ - gpr_test_util
+ - gpr
- name: gpr_slice_buffer_test
build: test
language: c
diff --git a/config.m4 b/config.m4
index 0c3322d8ef..6ecacb9173 100644
--- a/config.m4
+++ b/config.m4
@@ -56,6 +56,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/support/log_linux.c \
src/core/lib/support/log_posix.c \
src/core/lib/support/log_windows.c \
+ src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
@@ -96,6 +97,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/http/httpcli.c \
src/core/lib/http/parser.c \
src/core/lib/iomgr/closure.c \
+ src/core/lib/iomgr/combiner.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index e10e05387b..eeb63f4ddd 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -198,6 +198,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/backoff.h',
'src/core/lib/support/block_annotate.h',
'src/core/lib/support/env.h',
+ 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
@@ -225,6 +226,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/log_linux.c',
'src/core/lib/support/log_posix.c',
'src/core/lib/support/log_windows.c',
+ 'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
@@ -265,6 +267,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/closure.h',
+ 'src/core/lib/iomgr/combiner.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
@@ -420,6 +423,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
'src/core/lib/iomgr/closure.c',
+ 'src/core/lib/iomgr/combiner.c',
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
@@ -596,6 +600,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/backoff.h',
'src/core/lib/support/block_annotate.h',
'src/core/lib/support/env.h',
+ 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
@@ -618,6 +623,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/closure.h',
+ 'src/core/lib/iomgr/combiner.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
diff --git a/gRPC.podspec b/gRPC.podspec
index fd58bd6281..4f2203c7a5 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -68,6 +68,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/backoff.h',
'src/core/lib/support/block_annotate.h',
'src/core/lib/support/env.h',
+ 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
@@ -137,6 +138,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/log_linux.c',
'src/core/lib/support/log_posix.c',
'src/core/lib/support/log_windows.c',
+ 'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
@@ -177,6 +179,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/closure.h',
+ 'src/core/lib/iomgr/combiner.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
@@ -362,6 +365,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
'src/core/lib/iomgr/closure.c',
+ 'src/core/lib/iomgr/combiner.c',
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
@@ -538,6 +542,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/backoff.h',
'src/core/lib/support/block_annotate.h',
'src/core/lib/support/env.h',
+ 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
@@ -560,6 +565,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/closure.h',
+ 'src/core/lib/iomgr/combiner.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 369851b0f2..b9f858284a 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -89,6 +89,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/backoff.h )
s.files += %w( src/core/lib/support/block_annotate.h )
s.files += %w( src/core/lib/support/env.h )
+ s.files += %w( src/core/lib/support/mpscq.h )
s.files += %w( src/core/lib/support/murmur_hash.h )
s.files += %w( src/core/lib/support/stack_lockfree.h )
s.files += %w( src/core/lib/support/string.h )
@@ -116,6 +117,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/log_linux.c )
s.files += %w( src/core/lib/support/log_posix.c )
s.files += %w( src/core/lib/support/log_windows.c )
+ s.files += %w( src/core/lib/support/mpscq.c )
s.files += %w( src/core/lib/support/murmur_hash.c )
s.files += %w( src/core/lib/support/slice.c )
s.files += %w( src/core/lib/support/slice_buffer.c )
@@ -186,6 +188,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/httpcli.h )
s.files += %w( src/core/lib/http/parser.h )
s.files += %w( src/core/lib/iomgr/closure.h )
+ s.files += %w( src/core/lib/iomgr/combiner.h )
s.files += %w( src/core/lib/iomgr/endpoint.h )
s.files += %w( src/core/lib/iomgr/endpoint_pair.h )
s.files += %w( src/core/lib/iomgr/error.h )
@@ -341,6 +344,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/httpcli.c )
s.files += %w( src/core/lib/http/parser.c )
s.files += %w( src/core/lib/iomgr/closure.c )
+ s.files += %w( src/core/lib/iomgr/combiner.c )
s.files += %w( src/core/lib/iomgr/endpoint.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.c )
diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h
index 5589d5d411..ae00fb0f16 100644
--- a/include/grpc/impl/codegen/atm.h
+++ b/include/grpc/impl/codegen/atm.h
@@ -75,6 +75,9 @@
int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
+
+ // Atomically, set *p=n and return the old value of *p
+ gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n);
*/
#include <grpc/impl/codegen/port_platform.h>
diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h
index 8caf7edbde..7d4ae98cf7 100644
--- a/include/grpc/impl/codegen/atm_gcc_atomic.h
+++ b/include/grpc/impl/codegen/atm_gcc_atomic.h
@@ -69,4 +69,6 @@ static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
__ATOMIC_RELAXED);
}
+#define gpr_atm_full_xchg(p, n) __atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)
+
#endif /* GRPC_IMPL_CODEGEN_ATM_GCC_ATOMIC_H */
diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h
index 915b09fb0c..86ce2f2548 100644
--- a/include/grpc/impl/codegen/atm_gcc_sync.h
+++ b/include/grpc/impl/codegen/atm_gcc_sync.h
@@ -84,4 +84,11 @@ static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) {
#define gpr_atm_acq_cas(p, o, n) (__sync_bool_compare_and_swap((p), (o), (n)))
#define gpr_atm_rel_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
+static __inline gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n) {
+ gpr_atm cur;
+ do {
+ cur = gpr_atm_acq_load(p);
+ } while (!gpr_atm_rel_cas(p, cur, n));
+}
+
#endif /* GRPC_IMPL_CODEGEN_ATM_GCC_SYNC_H */
diff --git a/include/grpc/impl/codegen/atm_windows.h b/include/grpc/impl/codegen/atm_windows.h
index d5fa8c0f62..3e5d1b301c 100644
--- a/include/grpc/impl/codegen/atm_windows.h
+++ b/include/grpc/impl/codegen/atm_windows.h
@@ -122,4 +122,12 @@ static __inline gpr_atm gpr_atm_full_fetch_add(gpr_atm *p, gpr_atm delta) {
return old;
}
+<<<<<<< HEAD:include/grpc/impl/codegen/atm_win32.h
+static __inline gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n) {
+ return (gpr_atm)InterlockedExchangePointer((PVOID *)p, (PVOID)n);
+}
+
+#endif /* GRPC_IMPL_CODEGEN_ATM_WIN32_H */
+=======
#endif /* GRPC_IMPL_CODEGEN_ATM_WINDOWS_H */
+>>>>>>> c28a6c1b3b4d68da7661997cd56d305b254a7d0b:include/grpc/impl/codegen/atm_windows.h
diff --git a/package.xml b/package.xml
index d7d10b3f7c..e6c7970ed1 100644
--- a/package.xml
+++ b/package.xml
@@ -96,6 +96,7 @@
<file baseinstalldir="/" name="src/core/lib/support/backoff.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/block_annotate.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/env.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/support/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string.h" role="src" />
@@ -123,6 +124,7 @@
<file baseinstalldir="/" name="src/core/lib/support/log_linux.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/log_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/log_windows.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/support/mpscq.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/slice.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/slice_buffer.c" role="src" />
@@ -193,6 +195,7 @@
<file baseinstalldir="/" name="src/core/lib/http/httpcli.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/parser.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/combiner.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error.h" role="src" />
@@ -348,6 +351,7 @@
<file baseinstalldir="/" name="src/core/lib/http/httpcli.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/parser.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/combiner.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_windows.c" role="src" />
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index d089cd4399..43b88d2ce4 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -504,14 +504,13 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.connectivity_state = state;
- op.on_connectivity_state_change = closure;
- op.bind_pollset_set = interested_parties;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -525,12 +524,11 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_ping = closure;
+ op->send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index b96a0ad093..7eb17c713a 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -200,7 +200,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
}
typedef struct {
- grpc_transport_stream_op *ops;
+ grpc_transport_stream_op **ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -228,7 +228,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -244,7 +244,7 @@ static void add_waiting_locked(grpc_subchannel_call_holder *holder,
gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
sizeof(*holder->waiting_ops));
}
- holder->waiting_ops[holder->waiting_ops_count++] = *op;
+ holder->waiting_ops[holder->waiting_ops_count++] = op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -254,7 +254,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, &holder->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, holder->waiting_ops[i], GRPC_ERROR_REF(error));
}
holder->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 8d2deb02f3..19b22a2905 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -74,7 +74,7 @@ typedef struct grpc_subchannel_call_holder {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op *waiting_ops;
+ grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index d5695fe49c..29fd9f8804 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -89,8 +89,16 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
@@ -104,11 +112,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
-/** Perform a transport_op */
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *transport_op);
-
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -120,22 +123,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
-/** Add endpoint from this transport to pollset */
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored, void *pollset);
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *pollset_set);
-
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t);
-
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, grpc_error *error, const char *reason);
@@ -148,9 +139,8 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream);
+ void *byte_stream,
+ grpc_error *error_ignored);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
@@ -164,9 +154,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- gpr_mu_lock(&t->executor.mu);
-
- GPR_ASSERT(t->ep == NULL);
+ grpc_endpoint_destroy(exec_ctx, t->ep);
gpr_slice_buffer_destroy(&t->global.qbuf);
@@ -190,8 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- gpr_mu_unlock(&t->executor.mu);
- gpr_mu_destroy(&t->executor.mu);
+ grpc_combiner_destroy(exec_ctx, t->executor.combiner);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -250,11 +237,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->base.vtable = &vtable;
t->ep = ep;
- /* one ref is for destroy, the other for when ep becomes NULL */
- gpr_ref_init(&t->refs, 2);
+ /* one ref is for destroy */
+ gpr_ref_init(&t->refs, 1);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- gpr_mu_init(&t->executor.mu);
+ t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -280,15 +267,20 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
+ grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
- grpc_closure_init(&t->initiate_writing, initiate_writing, t);
+ grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
+ grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
+ grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
+ t);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
- grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) {
@@ -412,45 +404,34 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
-static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *arg_ignored) {
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
- NULL, 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ grpc_closure_create(destroy_transport_locked, t),
+ GRPC_ERROR_NONE);
UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
- GPR_ASSERT(t->ep);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
- if (t->ep) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
- }
+ grpc_endpoint_shutdown(exec_ctx, t->ep);
}
}
-static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe because we'll still have the ref for write */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
-}
-
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
@@ -461,9 +442,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- if (t->ep) {
- allow_endpoint_shutdown_locked(exec_ctx, t);
- }
+ allow_endpoint_shutdown_locked(exec_ctx, t);
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream_global *stream_global;
@@ -497,11 +476,10 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
-static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *arg_ignored) {
- grpc_chttp2_register_stream(t, s);
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_register_stream(s->t, s);
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@@ -512,6 +490,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s, 0, sizeof(*s));
+ s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
@@ -546,16 +525,18 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.in_stream_map = true;
}
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
- NULL, 0);
+ grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
+ GRPC_ERROR_NONE);
return 0;
}
-static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg) {
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
grpc_byte_stream *bs;
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -574,7 +555,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
@@ -611,7 +592,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(arg);
+ gpr_free(s->destroy_stream_arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@@ -619,8 +600,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
- and_free_memory, 0);
+ s->destroy_stream_arg = and_free_memory;
+ grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
+ GRPC_ERROR_NONE);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -653,10 +636,6 @@ static const char *write_state_name(grpc_chttp2_write_state state) {
switch (state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
return "INACTIVE";
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- return "REQUESTED[p=0]";
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- return "REQUESTED[p=1]";
case GRPC_CHTTP2_WRITE_SCHEDULED:
return "SCHEDULED";
case GRPC_CHTTP2_WRITING:
@@ -679,119 +658,18 @@ static void set_write_state(grpc_chttp2_transport *t,
t->executor.write_state = state;
}
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_chttp2_executor_action_header *hdr;
- grpc_chttp2_executor_action_header *next;
-
- GPR_TIMER_BEGIN("finish_global_actions", 0);
-
- for (;;) {
- check_read_ops(exec_ctx, &t->global);
-
- gpr_mu_lock(&t->executor.mu);
- if (t->executor.pending_actions_head != NULL) {
- hdr = t->executor.pending_actions_head;
- t->executor.pending_actions_head = t->executor.pending_actions_tail =
- NULL;
- gpr_mu_unlock(&t->executor.mu);
- while (hdr != NULL) {
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
- next = hdr->next;
- gpr_free(hdr);
- UNREF_TRANSPORT(exec_ctx, t, "pending_action");
- hdr = next;
- }
- continue;
- } else {
- t->executor.global_active = false;
- switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
- REF_TRANSPORT(t, "initiate_writing");
- gpr_mu_unlock(&t->executor.mu);
- grpc_exec_ctx_sched(exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
- grpc_endpoint_get_workqueue(t->ep));
- break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- start_writing(exec_ctx, t);
- gpr_mu_unlock(&t->executor.mu);
- break;
- case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITING:
- case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- gpr_mu_unlock(&t->executor.mu);
- break;
- }
- }
- break;
- }
-
- GPR_TIMER_END("finish_global_actions", 0);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
+ start_writing(exec_ctx, t);
}
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg) {
- grpc_chttp2_executor_action_header *hdr;
-
- GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
-
- REF_TRANSPORT(t, "run_global");
- gpr_mu_lock(&t->executor.mu);
-
- for (;;) {
- if (!t->executor.global_active) {
- t->executor.global_active = 1;
- gpr_mu_unlock(&t->executor.mu);
-
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- action(exec_ctx, t, optional_stream, arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
-
- finish_global_actions(exec_ctx, t);
- } else {
- gpr_mu_unlock(&t->executor.mu);
-
- hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
- hdr->stream = optional_stream;
- hdr->action = action;
- if (sizeof_arg == 0) {
- hdr->arg = arg;
- } else {
- hdr->arg = hdr + 1;
- memcpy(hdr->arg, arg, sizeof_arg);
- }
-
- gpr_mu_lock(&t->executor.mu);
- if (!t->executor.global_active) {
- /* global lock was released while allocating memory: release & retry */
- gpr_free(hdr);
- continue;
- }
- hdr->next = NULL;
- if (t->executor.pending_actions_head != NULL) {
- t->executor.pending_actions_tail =
- t->executor.pending_actions_tail->next = hdr;
- } else {
- t->executor.pending_actions_tail = t->executor.pending_actions_head =
- hdr;
- }
- REF_TRANSPORT(t, "pending_action");
- gpr_mu_unlock(&t->executor.mu);
- }
- break;
- }
-
- UNREF_TRANSPORT(exec_ctx, t, "run_global");
-
- GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
+static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ t->executor.check_read_ops_scheduled = false;
+ check_read_ops(exec_ctx, &t->global);
}
/*******************************************************************************
@@ -804,23 +682,17 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
- set_write_state(t, covered_by_poller
- ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
- : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- reason);
- break;
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- /* nothing to do: write already requested */
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ covered_by_poller);
break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ grpc_combiner_force_async_finally(t->executor.combiner);
}
break;
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- /* nothing to do: write already scheduled */
- break;
case GRPC_CHTTP2_WRITING:
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
@@ -840,8 +712,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
- t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
@@ -857,26 +728,9 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
"start_writing:nothing_to_write");
}
end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
}
}
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg_ignored) {
- start_writing(exec_ctx, t);
- UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
-}
-
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
- NULL, 0);
-}
-
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
@@ -917,12 +771,9 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *a) {
- grpc_error *error = a;
-
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
@@ -931,39 +782,37 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- end_waiting_for_write(exec_ctx, t, error);
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- "terminate_writing");
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ true);
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- "terminate_writing");
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ false);
break;
}
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
-
UNREF_TRANSPORT(exec_ctx, t, "writing");
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
+ GRPC_ERROR_REF(error));
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -1107,12 +956,13 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *stream_op) {
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
+ grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_stream *s = op->transport_private.args[1];
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
@@ -1170,7 +1020,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (!stream_global->write_closed) {
if (transport_global->is_client) {
@@ -1237,7 +1088,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (stream_global->write_closed) {
stream_global->send_trailing_metadata = NULL;
@@ -1262,7 +1114,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (op->recv_message != NULL) {
@@ -1276,7 +1129,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, transport_global, stream_global,
transport_global->stream_lookahead, 0);
}
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (op->recv_trailing_metadata != NULL) {
@@ -1285,7 +1139,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
add_closure_barrier(on_complete);
stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
stream_global->final_metadata_requested = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
@@ -1297,9 +1152,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
- sizeof(*op));
+ grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
+ op);
+ op->transport_private.args[0] = gt;
+ op->transport_private.args[1] = gs;
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1321,13 +1179,20 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
-static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *opaque_8bytes) {
+typedef struct ack_ping_args {
+ grpc_closure closure;
+ grpc_chttp2_transport *t;
+ uint8_t opaque_8bytes[8];
+} ack_ping_args;
+
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
+ grpc_error *error_ignored) {
+ ack_ping_args *args = a;
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_transport_global *transport_global = &args->t->global;
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
+ if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) {
grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
@@ -1335,21 +1200,25 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
break;
}
}
+ gpr_free(args);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
const uint8_t *opaque_8bytes) {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
- ack_ping_locked, (void *)opaque_8bytes, 8);
+ ack_ping_args *args = gpr_malloc(sizeof(*args));
+ args->t = TRANSPORT_FROM_PARSING(transport_parsing);
+ memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
+ grpc_closure_init(&args->closure, ack_ping_locked, args);
+ grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure,
+ GRPC_ERROR_NONE);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *stream_op) {
+ void *stream_op,
+ grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
@@ -1361,8 +1230,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
-
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@@ -1388,11 +1255,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
}
if (op->bind_pollset_set) {
- add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
}
if (op->send_ping) {
@@ -1402,13 +1269,18 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
+
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
+ op->transport_private.args[0] = gt;
+ grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
+ op);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1426,7 +1298,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1449,7 +1321,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1470,7 +1342,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1498,7 +1370,8 @@ static void decrement_active_streams_locked(
grpc_chttp2_stream_global *stream_global) {
if ((stream_global->all_incoming_byte_streams_finished =
gpr_unref(&stream_global->active_streams))) {
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
}
@@ -1602,7 +1475,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
}
if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, due_to_error);
@@ -1614,7 +1488,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_status_code status, gpr_slice *slice) {
if (status != GRPC_STATUS_OK) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
/* stream_global->recv_trailing_metadata_finished gives us a
last chance replacement: we've received trailing metadata,
@@ -1638,7 +1513,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
stream_global->published_trailing_metadata = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (slice) {
gpr_slice_unref(*slice);
@@ -1698,7 +1574,8 @@ void grpc_chttp2_mark_stream_closed(
GRPC_ERROR_UNREF(error);
return;
}
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
if (close_reads && !stream_global->read_closed) {
stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
@@ -1914,16 +1791,12 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
@@ -1931,16 +1804,16 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
- grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
- GRPC_ERROR_REF(error), 0);
+ grpc_chttp2_transport *t = tp;
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &t->reading_action_locked, GRPC_ERROR_REF(error));
}
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
- grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1949,9 +1822,10 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error),
+ NULL);
} else {
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, error);
}
}
@@ -2006,12 +1880,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(errors[i]);
}
GPR_TIMER_END("reading_action.parse", 0);
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
- 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
+ err);
}
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
@@ -2037,7 +1912,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, t, NULL, op);
+ perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
@@ -2054,15 +1929,14 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, error);
}
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg) {
- grpc_error *error = arg;
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
bool keep_reading = false;
+ GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
error = GRPC_ERROR_CREATE("Transport closed");
}
@@ -2077,16 +1951,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
write_state_name(t->executor.write_state));
}
- if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
- destroy_endpoint(exec_ctx, t);
- }
} else if (!t->closed) {
keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -2095,6 +1965,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
+ GRPC_ERROR_UNREF(error);
}
/*******************************************************************************
@@ -2116,36 +1987,16 @@ static void connectivity_state_set(
* POLLSET STUFF
*/
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *pollset) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
- }
-}
-
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *pollset_set) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
- }
-}
-
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- /* TODO(ctiller): keep pollset alive */
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_locked, pollset, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_set_locked, pollset_set, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
}
/*******************************************************************************
@@ -2157,6 +2008,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
+ gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2202,38 +2054,31 @@ static void incoming_byte_stream_update_flow_control(
}
}
-typedef struct {
- grpc_chttp2_incoming_byte_stream *byte_stream;
- gpr_slice *slice;
- size_t max_size_hint;
- grpc_closure *on_complete;
-} incoming_byte_stream_next_arg;
-
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_next_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
+ void *argp,
+ grpc_error *error_ignored) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
- stream_global, arg->max_size_hint,
- bs->slices.length);
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, transport_global, stream_global,
+ bs->next_action.max_size_hint, bs->slices.length);
}
+ gpr_mu_lock(&bs->slice_mu);
if (bs->slices.count > 0) {
- *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
- } else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
NULL);
+ } else if (bs->error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(bs->error), NULL);
} else {
- bs->on_next = arg->on_complete;
- bs->next = arg->slice;
+ bs->on_next = bs->next_action.on_complete;
+ bs->next = bs->next_action.slice;
}
+ gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2243,11 +2088,14 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_closure *on_complete) {
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_next_locked, &arg,
- sizeof(arg));
+ bs->next_action.slice = slice;
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->next_action.closure, GRPC_ERROR_NONE);
return 0;
}
@@ -2255,9 +2103,8 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream) {
+ void *byte_stream,
+ grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, &bs->transport->global,
@@ -2269,8 +2116,10 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_destroy_locked, bs, 0);
+ grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->destroy_action, GRPC_ERROR_NONE);
}
typedef struct {
@@ -2278,64 +2127,29 @@ typedef struct {
gpr_slice slice;
} incoming_byte_stream_push_arg;
-static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_push_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ gpr_mu_lock(&bs->slice_mu);
if (bs->on_next != NULL) {
- *bs->next = arg->slice;
+ *bs->next = slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, arg->slice);
+ gpr_slice_buffer_add(&bs->slices, slice);
}
- incoming_byte_stream_unref(exec_ctx, bs);
-}
-
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- incoming_byte_stream_push_arg arg = {bs, slice};
- gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_push_locked, &arg,
- sizeof(arg));
-}
-
-typedef struct {
- grpc_chttp2_incoming_byte_stream *bs;
- grpc_error *error;
-} bs_fail_args;
-
-static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
- bs_fail_args *a = gpr_malloc(sizeof(*a));
- a->bs = bs;
- a->error = error;
- return a;
-}
-
-static void incoming_byte_stream_finished_failed_locked(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- void *argp) {
- bs_fail_args *a = argp;
- grpc_chttp2_incoming_byte_stream *bs = a->bs;
- grpc_error *error = a->error;
- gpr_free(a);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
- bs->error = error;
- incoming_byte_stream_unref(exec_ctx, bs);
+ gpr_mu_unlock(&bs->slice_mu);
}
-static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
+static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
+ void *bsp, grpc_error *error) {
+ grpc_chttp2_incoming_byte_stream *bs = bsp;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
+ }
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2343,24 +2157,12 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
if (from_parsing_thread) {
- if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_ok_locked,
- bs, 0);
- } else {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_failed_locked,
- make_bs_fail_args(bs, error), 0);
- }
+ grpc_closure_init(&bs->finished_action,
+ incoming_byte_stream_finished_locked, bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->finished_action, GRPC_ERROR_REF(error));
} else {
- if (error == GRPC_ERROR_NONE) {
- incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
- bs->stream, bs);
- } else {
- incoming_byte_stream_finished_failed_locked(
- exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
- }
+ incoming_byte_stream_finished_locked(exec_ctx, bs, error);
}
}
@@ -2374,6 +2176,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index c06d94a0ba..1b3f62b9cf 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -161,9 +162,20 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
int is_tail;
+
+ gpr_mu slice_mu; // protects slices, on_next
gpr_slice_buffer slices;
grpc_closure *on_next;
gpr_slice *next;
+
+ struct {
+ grpc_closure closure;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+ } next_action;
+ grpc_closure destroy_action;
+ grpc_closure finished_action;
};
typedef struct {
@@ -294,23 +306,9 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
-typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg);
-
-typedef struct grpc_chttp2_executor_action_header {
- grpc_chttp2_stream *stream;
- grpc_chttp2_locked_action action;
- struct grpc_chttp2_executor_action_header *next;
- void *arg;
-} grpc_chttp2_executor_action_header;
-
typedef enum {
/** no writing activity */
GRPC_CHTTP2_WRITING_INACTIVE,
- /** write has been requested, but not scheduled yet */
- GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
/** write has been requested and scheduled against the workqueue */
GRPC_CHTTP2_WRITE_SCHEDULED,
/** write has been initiated after being reaped from the workqueue */
@@ -331,7 +329,7 @@ struct grpc_chttp2_transport {
gpr_refcount shutdown_ep_refs;
struct {
- gpr_mu mu;
+ grpc_combiner *combiner;
/** is a thread currently in the global lock */
bool global_active;
@@ -339,9 +337,8 @@ struct grpc_chttp2_transport {
bool parsing_active;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
-
- grpc_chttp2_executor_action_header *pending_actions_head;
- grpc_chttp2_executor_action_header *pending_actions_tail;
+ /** has a check_read_ops been scheduled */
+ bool check_read_ops_scheduled;
} executor;
/** is the transport destroying itself? */
@@ -377,10 +374,16 @@ struct grpc_chttp2_transport {
grpc_closure writing_action;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
+ grpc_closure reading_action_locked;
+ grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
+ /** closure to finish writing */
+ grpc_closure terminate_writing;
+ /** closure to flush read state up the stack */
+ grpc_closure initiate_read_flush_locked;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -524,11 +527,16 @@ struct grpc_chttp2_stream_parsing {
};
struct grpc_chttp2_stream {
+ grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
+ grpc_closure init_stream;
+ grpc_closure destroy_stream;
+ void *destroy_stream_arg;
+
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
};
@@ -620,7 +628,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_check_read_ops(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
@@ -700,12 +708,6 @@ void grpc_chttp2_complete_closure_step(
grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
grpc_error *error);
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *transport,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg);
-
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e1fc0ddee2..64ddbf0265 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -87,8 +87,8 @@ void grpc_chttp2_prepare_to_read(
transport_global->settings[GRPC_SENT_SETTINGS],
sizeof(transport_parsing->last_sent_settings));
transport_parsing->max_frame_size =
- transport_global->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
+ transport_global
+ ->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
/* update the parsing view of incoming window */
while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
@@ -177,7 +177,8 @@ void grpc_chttp2_publish_reads(
stream_global->seen_error = true;
stream_global->exceeded_metadata_size =
stream_parsing->exceeded_metadata_size;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
/* flush stats to global stream state */
@@ -203,7 +204,8 @@ void grpc_chttp2_publish_reads(
stream_global->incoming_frames.tail->is_tail = 0;
}
if (stream_parsing->data_parser.incoming_frames.head != NULL) {
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_incoming_frame_queue_merge(
&stream_global->incoming_frames,
@@ -219,7 +221,8 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[0],
stream_global->received_initial_metadata);
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (!stream_global->published_trailing_metadata &&
stream_parsing->got_metadata_on_parse[1]) {
@@ -228,7 +231,8 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[1],
stream_global->received_trailing_metadata);
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 2eb5f5f632..4dc4968248 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -298,8 +298,15 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
}
void grpc_chttp2_list_add_check_read_ops(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
+ if (!t->executor.check_read_ops_scheduled) {
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_read_flush_locked,
+ GRPC_ERROR_NONE, false);
+ t->executor.check_read_ops_scheduled = true;
+ }
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 87175d7943..e1375348a9 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -32,6 +32,7 @@
*/
#include "src/core/lib/channel/channel_stack.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <stdlib.h>
@@ -259,21 +260,27 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
+ gpr_free(op);
+}
+
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->cancel_error = GRPC_ERROR_CANCELLED;
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_call_next_op(exec_ctx, cur_elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
gpr_slice *optional_message) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- grpc_transport_stream_op_add_cancellation_with_message(&op, status,
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ grpc_call_next_op(exec_ctx, cur_elem, op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 32ebe53ee6..d4b1cf0296 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -60,7 +60,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
- grpc_transport_stream_op send_op;
+ grpc_transport_stream_op *send_op;
uint32_t send_length;
uint32_t send_flags;
gpr_slice incoming_slice;
@@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op.send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op.on_complete;
- calld->send_op.on_complete = &calld->send_done;
+ calld->send_op->send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
@@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
@@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
if (op->send_message != NULL && !skip_compression(elem) &&
0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
- calld->send_op = *op;
+ calld->send_op = op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
continue_send_message(exec_ctx, elem);
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 0b6c3b2539..1ba0a5c141 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -41,6 +41,10 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
}
+void grpc_closure_list_init(grpc_closure_list *closure_list) {
+ closure_list->head = closure_list->tail = NULL;
+}
+
void grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) {
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 08e59a168e..c1a22b6021 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -37,6 +37,7 @@
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/support/mpscq.h"
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -60,6 +61,14 @@ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
+ /** Once queued, next indicates the next queued closure; before then, scratch
+ * space */
+ union {
+ grpc_closure *next;
+ gpr_mpscq_node atm_next;
+ uintptr_t scratch;
+ } next_data;
+
/** Bound callback. */
grpc_iomgr_cb_func cb;
@@ -68,13 +77,6 @@ struct grpc_closure {
/** Once queued, the result of the closure. Before then: scratch space */
grpc_error *error;
-
- /** Once queued, next indicates the next queued closure; before then, scratch
- * space */
- union {
- grpc_closure *next;
- uintptr_t scratch;
- } next_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -87,6 +89,8 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
+void grpc_closure_list_init(grpc_closure_list *list);
+
/** add \a closure to the end of \a list
and set \a closure's result to \a error */
void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
new file mode 100644
index 0000000000..6b326facc3
--- /dev/null
+++ b/src/core/lib/iomgr/combiner.c
@@ -0,0 +1,221 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/workqueue.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+struct grpc_combiner {
+ grpc_workqueue *optional_workqueue;
+ gpr_mpscq queue;
+ // state is:
+ // lower bit - zero if orphaned
+ // other bits - number of items queued on the lock
+ gpr_atm state;
+ bool take_async_break_before_final_list;
+ grpc_closure_list final_list;
+ grpc_closure continue_finishing;
+};
+
+grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
+ grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ lock->optional_workqueue = GRPC_WORKQUEUE_REF(optional_workqueue, "combiner");
+ gpr_atm_no_barrier_store(&lock->state, 1);
+ gpr_mpscq_init(&lock->queue);
+ lock->take_async_break_before_final_list = false;
+ grpc_closure_list_init(&lock->final_list);
+ return lock;
+}
+
+static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
+ gpr_mpscq_destroy(&lock->queue);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
+ gpr_free(lock);
+}
+
+void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
+ really_destroy(exec_ctx, lock);
+ }
+}
+
+static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+
+static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_combiner *lock = arg;
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ exec_ctx->active_combiner = lock;
+ if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
+}
+
+static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ grpc_closure *c = lock->final_list.head;
+ grpc_closure_list_init(&lock->final_list);
+ lock->take_async_break_before_final_list = false;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+}
+
+static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_combiner *lock = arg;
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ exec_ctx->active_combiner = lock;
+ // quick peek to see if new things have turned up on the queue: if so, go back
+ // to executing them before the final list
+ if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
+ if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
+ } else {
+ execute_final(exec_ctx, lock);
+ finish(exec_ctx, lock);
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
+}
+
+static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ if (lock->take_async_break_before_final_list) {
+ grpc_closure_init(&lock->continue_finishing, continue_executing_final,
+ lock);
+ grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
+ GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
+ return false;
+ } else {
+ execute_final(exec_ctx, lock);
+ return true;
+ }
+}
+
+static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ if (n == NULL) {
+ // queue is in an inconsistant state: use this as a cue that we should
+ // go off and do something else for a while (and come back later)
+ grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
+ lock);
+ grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
+ GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
+ return false;
+ }
+ grpc_closure *cl = (grpc_closure *)n;
+ grpc_error *error = cl->error;
+ cl->cb(exec_ctx, cl->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ return true;
+}
+
+static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock) =
+ maybe_finish_one;
+ do {
+ switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
+ case 5: // we're down to one queued item: if it's the final list we
+ case 4: // should do that
+ if (!grpc_closure_list_empty(lock->final_list)) {
+ executor = start_execute_final;
+ }
+ break;
+ case 3: // had one count, one unorphaned --> unlocked unorphaned
+ return;
+ case 2: // and one count, one orphaned --> unlocked and orphaned
+ really_destroy(exec_ctx, lock);
+ return;
+ case 1:
+ case 0:
+ // these values are illegal - representing an already unlocked or
+ // deleted lock
+ GPR_UNREACHABLE_CODE(return );
+ }
+ } while (executor(exec_ctx, lock));
+}
+
+void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error) {
+ gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
+ GPR_ASSERT(last & 1); // ensure lock has not been destroyed
+ if (last == 1) {
+ exec_ctx->active_combiner = lock;
+ cl->cb(exec_ctx, cl->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ finish(exec_ctx, lock);
+ GPR_ASSERT(exec_ctx->active_combiner == lock);
+ exec_ctx->active_combiner = NULL;
+ } else {
+ cl->error = error;
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ }
+}
+
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), true);
+}
+
+void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *closure, grpc_error *error,
+ bool force_async_break) {
+ if (exec_ctx->active_combiner != lock) {
+ grpc_combiner_execute(exec_ctx, lock,
+ grpc_closure_create(enqueue_finally, closure), error);
+ return;
+ }
+
+ if (force_async_break) {
+ lock->take_async_break_before_final_list = true;
+ }
+ if (grpc_closure_list_empty(lock->final_list)) {
+ gpr_atm_full_fetch_add(&lock->state, 2);
+ }
+ grpc_closure_list_append(&lock->final_list, closure, error);
+}
+
+void grpc_combiner_force_async_finally(grpc_combiner *lock) {
+ lock->take_async_break_before_final_list = true;
+}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
new file mode 100644
index 0000000000..5b94d5bd99
--- /dev/null
+++ b/src/core/lib/iomgr/combiner.h
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_COMBINER_H
+#define GRPC_CORE_LIB_IOMGR_COMBINER_H
+
+#include <stddef.h>
+
+#include <grpc/support/atm.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/mpscq.h"
+
+// Provides serialized access to some resource.
+// Each action queued on an aelock is executed serially in a borrowed thread.
+// The actual thread executing actions may change over time (but there will only
+// every be one at a time).
+
+// Initialize the lock, with an optional workqueue to shift load to when
+// necessary
+grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
+// Destroy the lock
+void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+// Execute \a action within the lock.
+void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *closure, grpc_error *error);
+// Execute \a action within the lock just prior to unlocking.
+// if \a hint_async_break is additionally set, the combiner is tries to trip
+// through the workqueue between finishing the primary queue of combined
+// closures and executing the finally list.
+// Takes a very slow and round-about path if not called from a
+// grpc_combiner_execute closure
+void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *closure, grpc_error *error,
+ bool hint_async_break);
+void grpc_combiner_force_async_finally(grpc_combiner *lock);
+
+#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 917f332f03..1895ee6245 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -40,8 +40,8 @@
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
-struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue;
+typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context.
@@ -66,13 +66,15 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ /** currently active combiner: updated only via combiner.c */
+ grpc_combiner *active_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
+ { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 9f7219ebf1..416618e258 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -50,8 +50,6 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-
//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index e0d6dac230..49a100520e 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -52,8 +52,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
char name[32];
*workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&(*workqueue)->refs, 1);
- gpr_mu_init(&(*workqueue)->mu);
- (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL;
+ gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
if (err != GRPC_ERROR_NONE) {
gpr_free(*workqueue);
@@ -62,6 +61,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
sprintf(name, "workqueue:%p", (void *)(*workqueue));
(*workqueue)->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
+ gpr_mpscq_init(&(*workqueue)->queue);
grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
&(*workqueue)->read_closure);
@@ -70,21 +70,30 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
}
+static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {
+ if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
+ workqueue_destroy(exec_ctx, workqueue);
+ }
+}
+
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
reason);
+ gpr_ref(&workqueue->refs);
+}
#else
void grpc_workqueue_ref(grpc_workqueue *workqueue) {
-#endif
+ if (workqueue == NULL) return;
gpr_ref(&workqueue->refs);
}
+#endif
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
@@ -92,35 +101,43 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
reason);
+ if (gpr_unref(&workqueue->refs)) {
+ workqueue_orphan(exec_ctx, workqueue);
+ }
+}
#else
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
-#endif
+ if (workqueue == NULL) return;
if (gpr_unref(&workqueue->refs)) {
- workqueue_destroy(exec_ctx, workqueue);
+ workqueue_orphan(exec_ctx, workqueue);
}
}
+#endif
+
+static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+ abort();
+}
-void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- gpr_mu_lock(&workqueue->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
- gpr_mu_unlock(&workqueue->mu);
+static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+ grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
+ if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
+ drain(exec_ctx, workqueue);
+ }
}
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_workqueue *workqueue = arg;
if (error != GRPC_ERROR_NONE) {
- gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
+ GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
gpr_free(workqueue);
} else {
- gpr_mu_lock(&workqueue->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
- gpr_mu_unlock(&workqueue->mu);
+ gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
if (error == GRPC_ERROR_NONE) {
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
@@ -128,24 +145,42 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* recurse to get error handling */
on_readable(exec_ctx, arg, error);
}
+ if (n == NULL) {
+ /* try again - queue in an inconsistant state */
+ wakeup(exec_ctx, workqueue);
+ } else {
+ switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
+ case 3: // had one count, one unorphaned --> done, unorphaned
+ break;
+ case 2: // had one count, one orphaned --> done, orphaned
+ workqueue_destroy(exec_ctx, workqueue);
+ break;
+ case 1:
+ case 0:
+ // these values are illegal - representing an already done or
+ // deleted workqueue
+ GPR_UNREACHABLE_CODE(break);
+ default:
+ // schedule a wakeup since there's more to do
+ wakeup(exec_ctx, workqueue);
+ }
+ grpc_closure *cl = (grpc_closure *)n;
+ grpc_error *clerr = cl->error;
+ cl->cb(exec_ctx, cl->cb_arg, clerr);
+ GRPC_ERROR_UNREF(clerr);
+ }
}
}
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
- grpc_error *push_error = GRPC_ERROR_NONE;
- gpr_mu_lock(&workqueue->mu);
- if (grpc_closure_list_empty(workqueue->closure_list)) {
- push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
- }
- grpc_closure_list_append(&workqueue->closure_list, closure, error);
- if (push_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(push_error);
- gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg);
- grpc_error_free_string(msg);
- grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
+ gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
+ GPR_ASSERT(last & 1);
+ closure->error = error;
+ gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
+ if (last == 1) {
+ wakeup(exec_ctx, workqueue);
}
- gpr_mu_unlock(&workqueue->mu);
}
#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index 2e8aca1816..c69ae8a941 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -35,14 +35,17 @@
#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/support/mpscq.h"
struct grpc_fd;
struct grpc_workqueue {
gpr_refcount refs;
-
- gpr_mu mu;
- grpc_closure_list closure_list;
+ gpr_mpscq queue;
+ // state is:
+ // lower bit - zero if orphaned
+ // other bits - number of items enqueued
+ gpr_atm state;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 23e2dea185..ee81dc248e 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -42,8 +42,6 @@
// context, which is at least correct, if not performant or in the spirit of
// workqueues.
-void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {}
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index bc50f9d1b0..0169ccd9ef 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -128,7 +128,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur,
static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
grpc_error *error) {
- if (false && grpc_trace_secure_endpoint) {
+ if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
char *data = gpr_dump_slice(ep->read_buffer->slices[i],
@@ -256,7 +256,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- if (false && grpc_trace_secure_endpoint) {
+ if (grpc_trace_secure_endpoint) {
for (i = 0; i < slices->count; i++) {
char *data =
gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 12e789bde9..1958a72b93 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -48,7 +48,7 @@ typedef struct call_data {
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
grpc_closure auth_on_recv;
- grpc_transport_stream_op transport_op;
+ grpc_transport_stream_op *transport_op;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@@ -106,6 +106,10 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ gpr_free(arg);
+}
+
/* called from application code */
static void on_md_processing_done(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
@@ -131,21 +135,22 @@ static void on_md_processing_done(
grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
} else {
gpr_slice message;
- grpc_transport_stream_op close_op;
- memset(&close_op, 0, sizeof(close_op));
+ grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op));
+ memset(close_op, 0, sizeof(*close_op));
grpc_metadata_array_destroy(&calld->md);
error_details = error_details != NULL
? error_details
: "Authentication metadata processing failed.";
message = gpr_slice_from_copied_string(error_details);
- calld->transport_op.send_initial_metadata = NULL;
- if (calld->transport_op.send_message != NULL) {
- grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message);
- calld->transport_op.send_message = NULL;
+ calld->transport_op->send_initial_metadata = NULL;
+ if (calld->transport_op->send_message != NULL) {
+ grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message);
+ calld->transport_op->send_message = NULL;
}
- calld->transport_op.send_trailing_metadata = NULL;
- grpc_transport_stream_op_add_close(&close_op, status, &message);
- grpc_call_next_op(&exec_ctx, elem, &close_op);
+ calld->transport_op->send_trailing_metadata = NULL;
+ close_op->on_complete = grpc_closure_create(destroy_op, close_op);
+ grpc_transport_stream_op_add_close(close_op, status, &message);
+ grpc_call_next_op(&exec_ctx, elem, close_op);
grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv,
grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
GRPC_ERROR_INT_GRPC_STATUS, status),
@@ -182,7 +187,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->auth_on_recv;
- calld->transport_op = *op;
+ calld->transport_op = op;
}
}
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
new file mode 100644
index 0000000000..25b055b172
--- /dev/null
+++ b/src/core/lib/support/mpscq.c
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/support/mpscq.h"
+
+#include <grpc/support/log.h>
+
+void gpr_mpscq_init(gpr_mpscq *q) {
+ gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
+ q->tail = &q->stub;
+ gpr_atm_no_barrier_store(&q->stub.next, 0);
+}
+
+void gpr_mpscq_destroy(gpr_mpscq *q) {
+ GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub);
+ GPR_ASSERT(q->tail == &q->stub);
+}
+
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
+ gpr_atm_no_barrier_store(&n->next, 0);
+ gpr_mpscq_node *prev =
+ (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
+ gpr_atm_rel_store(&prev->next, (gpr_atm)n);
+}
+
+gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
+ gpr_mpscq_node *tail = q->tail;
+ gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
+ if (tail == &q->stub) {
+ if (next == NULL) return NULL;
+ q->tail = next;
+ tail = next;
+ next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
+ }
+ if (next != NULL) {
+ q->tail = next;
+ return tail;
+ }
+ gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head);
+ if (tail != head) {
+ return 0;
+ }
+ gpr_mpscq_push(q, &q->stub);
+ next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
+ if (next != NULL) {
+ q->tail = next;
+ return tail;
+ }
+ return NULL;
+}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
new file mode 100644
index 0000000000..1201edceb1
--- /dev/null
+++ b/src/core/lib/support/mpscq.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_MPSCQ_H
+#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H
+
+#include <grpc/support/atm.h>
+#include <stddef.h>
+
+// Multiple-producer single-consumer lock free queue, based upon the
+// implementation from Dmitry Vyukov here:
+// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+
+// List node (include this in a data structure and dangle the rest of the
+// interesting bits off the end)
+typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node;
+
+// Actual queue type
+typedef struct gpr_mpscq {
+ gpr_atm head;
+ // make sure head & tail don't share a cacheline
+ char padding[GPR_CACHELINE_SIZE];
+ gpr_mpscq_node *tail;
+ gpr_mpscq_node stub;
+} gpr_mpscq;
+
+void gpr_mpscq_init(gpr_mpscq *q);
+void gpr_mpscq_destroy(gpr_mpscq *q);
+// Push a node
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
+// Pop a node (returns NULL if no node is ready - which doesn't indicate that
+// the queue is empty!!)
+gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
+
+#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index fc9df76dc1..36eec253c2 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -109,6 +109,10 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
+
+ /* TODO(ctiller): now that this is inlined, figure out how much of the above
+ state can be eliminated */
+ grpc_transport_stream_op op;
} batch_control;
struct grpc_call {
@@ -762,6 +766,7 @@ typedef struct termination_closure {
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
+ grpc_transport_stream_op op;
} termination_closure;
static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -781,26 +786,24 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
}
static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
- grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&op, 0, sizeof(op));
- op.cancel_error = tc->error;
+ memset(&tc->op, 0, sizeof(tc->op));
+ tc->op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &op);
+ tc->op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &tc->op);
}
static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
- grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&op, 0, sizeof(op));
- op.close_error = tc->error;
+ memset(&tc->op, 0, sizeof(tc->op));
+ tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = op.on_complete;
- op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &op);
+ tc->op_closure = tc->op.on_complete;
+ tc->op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &tc->op);
}
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
@@ -1354,7 +1357,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
int is_notify_tag_closure) {
- grpc_transport_stream_op stream_op;
size_t i;
const grpc_op *op;
batch_control *bctl;
@@ -1365,8 +1367,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
- memset(&stream_op, 0, sizeof(stream_op));
-
/* TODO(ctiller): this feels like it could be made lock-free */
gpr_mu_lock(&call->mu);
bctl = allocate_batch_control(call);
@@ -1375,6 +1375,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+ grpc_transport_stream_op *stream_op = &bctl->op;
+ memset(stream_op, 0, sizeof(*stream_op));
+
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->error = GRPC_ERROR_NONE;
@@ -1453,9 +1456,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op.send_initial_metadata =
+ stream_op->send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_initial_metadata_flags = op->flags;
+ stream_op->send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1475,7 +1478,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
- stream_op.send_message = &call->sending_stream.base;
+ stream_op->send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1493,7 +1496,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
bctl->send_final_op = 1;
call->sent_final_op = 1;
- stream_op.send_trailing_metadata =
+ stream_op->send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1540,7 +1543,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op.send_trailing_metadata =
+ stream_op->send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1558,9 +1561,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
- stream_op.recv_initial_metadata =
+ stream_op->recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op.recv_initial_metadata_ready =
+ stream_op->recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1577,10 +1580,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_message = 1;
bctl->recv_message = 1;
call->receiving_buffer = op->data.recv_message;
- stream_op.recv_message = &call->receiving_stream;
+ stream_op->recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl);
- stream_op.recv_message_ready = &call->receiving_stream_ready;
+ stream_op->recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1606,9 +1609,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.client.status_details_capacity =
op->data.recv_status_on_client.status_details_capacity;
bctl->recv_final_op = 1;
- stream_op.recv_trailing_metadata =
+ stream_op->recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op->collect_stats = &call->stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@@ -1628,9 +1631,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
- stream_op.recv_trailing_metadata =
+ stream_op->recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op->collect_stats = &call->stats.transport_stream_stats;
break;
}
}
@@ -1641,12 +1644,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op.context = call->context;
+ stream_op->context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
- stream_op.on_complete = &bctl->finish_batch;
+ stream_op->on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
- execute_op(exec_ctx, call, &stream_op);
+ execute_op(exec_ctx, call, stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 6d2b1c4935..52e78567bd 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
- memset(&op, 0, sizeof(op));
- op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, &op);
+ elem->filter->start_transport_op(&exec_ctx, elem, op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 9818f9d2f2..5f9f07f89a 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -61,19 +61,18 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL);
- memset(&op, 0, sizeof(op));
pr->tag = tag;
pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr);
- op.send_ping = &pr->closure;
- op.bind_pollset = grpc_cq_pollset(cq);
+ op->send_ping = &pr->closure;
+ op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index d4421be936..2641604901 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -273,22 +273,19 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) {
- grpc_transport_op op;
- struct shutdown_cleanup_args *sc;
+ struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_goaway = send_goaway;
- sc = gpr_malloc(sizeof(*sc));
+ op->send_goaway = send_goaway;
sc->slice = gpr_slice_from_copied_string("Server shutdown");
- op.goaway_message = &sc->slice;
- op.goaway_status = GRPC_STATUS_OK;
- op.disconnect_with_error = send_disconnect;
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
- op.on_consumed = &sc->closure;
+ op->goaway_message = &sc->slice;
+ op->goaway_status = GRPC_STATUS_OK;
+ op->disconnect_with_error = send_disconnect;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@@ -440,14 +437,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.on_consumed = &chand->finish_destroy_channel_closure;
+ grpc_transport_op *op =
+ grpc_make_transport_op(&chand->finish_destroy_channel_closure);
+ op->set_accept_stream = true;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -840,14 +836,13 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.on_connectivity_state_change = &chand->channel_connectivity_changed,
- op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ op);
} else {
gpr_mu_lock(&server->mu_global);
destroy_channel(exec_ctx, chand);
@@ -1111,7 +1106,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op op;
+ grpc_transport_op *op = NULL;
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@@ -1171,16 +1166,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
- memset(&op, 0, sizeof(op));
- op.set_accept_stream = true;
- op.set_accept_stream_fn = accept_stream;
- op.set_accept_stream_user_data = chand;
- op.on_connectivity_state_change = &chand->channel_connectivity_changed;
- op.connectivity_state = &chand->connectivity_state;
+ op = grpc_make_transport_op(NULL);
+ op->set_accept_stream = true;
+ op->set_accept_stream_fn = accept_stream;
+ op->set_accept_stream_user_data = chand;
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
- op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, &op);
+ grpc_transport_perform_op(exec_ctx, transport, op);
}
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 857c3909d2..08f9d7e8d9 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -32,10 +32,14 @@
*/
#include "src/core/lib/transport/transport.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -247,3 +251,26 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
}
+
+typedef struct {
+ grpc_closure outer_on_complete;
+ grpc_closure *inner_on_complete;
+ grpc_transport_op op;
+} made_transport_op;
+
+static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ made_transport_op *op = arg;
+ grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
+ NULL);
+ gpr_free(op);
+}
+
+grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
+ made_transport_op *op = gpr_malloc(sizeof(*op));
+ grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
+ op->inner_on_complete = on_complete;
+ memset(&op->op, 0, sizeof(op->op));
+ op->op.on_consumed = &op->outer_on_complete;
+ return &op->op;
+}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 08c0a237c9..508b57a5b4 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -96,6 +96,11 @@ void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
void grpc_transport_move_stats(grpc_transport_stream_stats *from,
grpc_transport_stream_stats *to);
+typedef struct {
+ grpc_closure closure;
+ void *args[2];
+} grpc_transport_private_op_data;
+
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
@@ -144,6 +149,12 @@ typedef struct grpc_transport_stream_op {
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
+
+ /***************************************************************************
+ * remaining fields are initialized and used at the discretion of the
+ * transport implementation */
+
+ grpc_transport_private_op_data transport_private;
} grpc_transport_stream_op;
/** Transport op: a set of operations to perform on a transport as a whole */
@@ -177,6 +188,12 @@ typedef struct grpc_transport_op {
grpc_pollset_set *bind_pollset_set;
/** send a ping, call this back if not NULL */
grpc_closure *send_ping;
+
+ /***************************************************************************
+ * remaining fields are initialized and used at the discretion of the
+ * transport implementation */
+
+ grpc_transport_private_op_data transport_private;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
@@ -268,4 +285,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
+/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
+ \a on_consumed and then delete the returned transport op */
+grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index b37e27c27e..5441af4c66 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -50,6 +50,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/log_linux.c',
'src/core/lib/support/log_posix.c',
'src/core/lib/support/log_windows.c',
+ 'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
@@ -90,6 +91,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
'src/core/lib/iomgr/closure.c',
+ 'src/core/lib/iomgr/combiner.c',
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c
new file mode 100644
index 0000000000..7cf016d82c
--- /dev/null
+++ b/test/core/iomgr/combiner_test.c
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/combiner.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
+
+#include "test/core/util/test_config.h"
+
+static void test_no_op(void) {
+ gpr_log(GPR_DEBUG, "test_no_op");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL));
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value,
+ grpc_error *error) {
+ *(bool *)value = true;
+}
+
+static void test_execute_one(void) {
+ gpr_log(GPR_DEBUG, "test_execute_one");
+
+ grpc_combiner *lock = grpc_combiner_create(NULL);
+ bool done = false;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner_execute(&exec_ctx, lock,
+ grpc_closure_create(set_bool_to_true, &done),
+ GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GPR_ASSERT(done);
+ grpc_combiner_destroy(&exec_ctx, lock);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+typedef struct {
+ size_t ctr;
+ grpc_combiner *lock;
+} thd_args;
+
+typedef struct {
+ size_t *ctr;
+ size_t value;
+} ex_args;
+
+static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) {
+ ex_args *args = a;
+ // gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value);
+ GPR_ASSERT(*args->ctr == args->value - 1);
+ *args->ctr = args->value;
+ gpr_free(a);
+}
+
+static void execute_many_loop(void *a) {
+ thd_args *args = a;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ size_t n = 1;
+ for (size_t i = 0; i < 10; i++) {
+ for (size_t j = 0; j < 10000; j++) {
+ ex_args *c = gpr_malloc(sizeof(*c));
+ c->ctr = &args->ctr;
+ c->value = n++;
+ grpc_combiner_execute(&exec_ctx, args->lock,
+ grpc_closure_create(check_one, c), GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_execute_many(void) {
+ gpr_log(GPR_DEBUG, "test_execute_many");
+
+ grpc_combiner *lock = grpc_combiner_create(NULL);
+ gpr_thd_id thds[100];
+ thd_args ta[GPR_ARRAY_SIZE(thds)];
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ ta[i].ctr = 0;
+ ta[i].lock = lock;
+ GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options));
+ }
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+ gpr_thd_join(thds[i]);
+ }
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner_destroy(&exec_ctx, lock);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static bool got_in_finally = false;
+
+static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ got_in_finally = true;
+}
+
+static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_combiner_execute_finally(exec_ctx, arg,
+ grpc_closure_create(in_finally, NULL),
+ GRPC_ERROR_NONE, false);
+}
+
+static void test_execute_finally(void) {
+ gpr_log(GPR_DEBUG, "test_execute_finally");
+
+ grpc_combiner *lock = grpc_combiner_create(NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock),
+ GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GPR_ASSERT(got_in_finally);
+ grpc_combiner_destroy(&exec_ctx, lock);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+ test_no_op();
+ test_execute_one();
+ test_execute_finally();
+ test_execute_many();
+ grpc_shutdown();
+
+ return 0;
+}
diff --git a/test/core/support/mpscq_test.c b/test/core/support/mpscq_test.c
new file mode 100644
index 0000000000..0b4bc254e2
--- /dev/null
+++ b/test/core/support/mpscq_test.c
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/support/mpscq.h"
+
+#include <stdlib.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
+#include "test/core/util/test_config.h"
+
+typedef struct test_node {
+ gpr_mpscq_node node;
+ size_t i;
+ size_t *ctr;
+} test_node;
+
+static test_node *new_node(size_t i, size_t *ctr) {
+ test_node *n = gpr_malloc(sizeof(test_node));
+ n->i = i;
+ n->ctr = ctr;
+ return n;
+}
+
+static void test_serial(void) {
+ gpr_log(GPR_DEBUG, "test_serial");
+ gpr_mpscq q;
+ gpr_mpscq_init(&q);
+ for (size_t i = 0; i < 10000000; i++) {
+ gpr_mpscq_push(&q, &new_node(i, NULL)->node);
+ }
+ for (size_t i = 0; i < 10000000; i++) {
+ test_node *n = (test_node *)gpr_mpscq_pop(&q);
+ GPR_ASSERT(n);
+ GPR_ASSERT(n->i == i);
+ gpr_free(n);
+ }
+}
+
+typedef struct {
+ size_t ctr;
+ gpr_mpscq *q;
+} thd_args;
+
+#define THREAD_ITERATIONS 100000
+
+static void test_thread(void *args) {
+ thd_args *a = args;
+ for (size_t i = 1; i <= THREAD_ITERATIONS; i++) {
+ gpr_mpscq_push(a->q, &new_node(i, &a->ctr)->node);
+ }
+}
+
+static void test_mt(void) {
+ gpr_log(GPR_DEBUG, "test_mt");
+ gpr_thd_id thds[100];
+ thd_args ta[GPR_ARRAY_SIZE(thds)];
+ gpr_mpscq q;
+ gpr_mpscq_init(&q);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ ta[i].ctr = 0;
+ ta[i].q = &q;
+ GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options));
+ }
+ size_t num_done = 0;
+ size_t spins = 0;
+ while (num_done != GPR_ARRAY_SIZE(thds)) {
+ gpr_mpscq_node *n;
+ while ((n = gpr_mpscq_pop(&q)) == NULL) {
+ spins++;
+ }
+ test_node *tn = (test_node *)n;
+ GPR_ASSERT(*tn->ctr == tn->i - 1);
+ *tn->ctr = tn->i;
+ if (tn->i == THREAD_ITERATIONS) num_done++;
+ gpr_free(tn);
+ }
+ gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, spins);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+ gpr_thd_join(thds[i]);
+ }
+ gpr_mpscq_destroy(&q);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_serial();
+ test_mt();
+ return 0;
+}
diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c
index f36f980575..cad45cc9fa 100644
--- a/test/core/surface/lame_client_test.c
+++ b/test/core/surface/lame_client_test.c
@@ -57,24 +57,23 @@ void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg,
void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
void test_transport_op(grpc_channel *channel) {
- grpc_transport_op op;
+ grpc_transport_op *op;
grpc_channel_element *elem;
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- memset(&op, 0, sizeof(op));
grpc_closure_init(&transport_op_cb, verify_connectivity, &op);
- op.on_connectivity_state_change = &transport_op_cb;
- op.connectivity_state = &state;
+ op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &transport_op_cb;
+ op->connectivity_state = &state;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, &op);
+ elem->filter->start_transport_op(&exec_ctx, elem, op);
grpc_exec_ctx_finish(&exec_ctx);
- memset(&op, 0, sizeof(op));
grpc_closure_init(&transport_op_cb, do_nothing, NULL);
- op.on_consumed = &transport_op_cb;
- elem->filter->start_transport_op(&exec_ctx, elem, &op);
+ op = grpc_make_transport_op(&transport_op_cb);
+ elem->filter->start_transport_op(&exec_ctx, elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/third_party/protobuf b/third_party/protobuf
-Subproject bdeb215cab2985195325fcd5e70c3fa751f46e0
+Subproject 3470b6895aa659b7559ed678e029a5338e535f1
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 8233da957d..73cdd8d04f 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -805,6 +805,7 @@ src/core/lib/http/format_request.h \
src/core/lib/http/httpcli.h \
src/core/lib/http/parser.h \
src/core/lib/iomgr/closure.h \
+src/core/lib/iomgr/combiner.h \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
src/core/lib/iomgr/error.h \
@@ -960,6 +961,7 @@ src/core/lib/http/format_request.c \
src/core/lib/http/httpcli.c \
src/core/lib/http/parser.c \
src/core/lib/iomgr/closure.c \
+src/core/lib/iomgr/combiner.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
@@ -1177,6 +1179,7 @@ src/core/lib/profiling/timers.h \
src/core/lib/support/backoff.h \
src/core/lib/support/block_annotate.h \
src/core/lib/support/env.h \
+src/core/lib/support/mpscq.h \
src/core/lib/support/murmur_hash.h \
src/core/lib/support/stack_lockfree.h \
src/core/lib/support/string.h \
@@ -1204,6 +1207,7 @@ src/core/lib/support/log_android.c \
src/core/lib/support/log_linux.c \
src/core/lib/support/log_posix.c \
src/core/lib/support/log_windows.c \
+src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index e3cfd55cd6..6245f58528 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -245,6 +245,22 @@
],
"headers": [],
"language": "c",
+ "name": "combiner_test",
+ "src": [
+ "test/core/iomgr/combiner_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "language": "c",
"name": "compression_test",
"src": [
"test/core/compression/compression_test.c"
@@ -604,6 +620,20 @@
],
"headers": [],
"language": "c",
+ "name": "gpr_mpscq_test",
+ "src": [
+ "test/core/support/mpscq_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util"
+ ],
+ "headers": [],
+ "language": "c",
"name": "gpr_slice_buffer_test",
"src": [
"test/core/support/slice_buffer_test.c"
@@ -5540,6 +5570,7 @@
"src/core/lib/support/backoff.h",
"src/core/lib/support/block_annotate.h",
"src/core/lib/support/env.h",
+ "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.h",
@@ -5603,6 +5634,8 @@
"src/core/lib/support/log_linux.c",
"src/core/lib/support/log_posix.c",
"src/core/lib/support/log_windows.c",
+ "src/core/lib/support/mpscq.c",
+ "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/slice.c",
@@ -5705,6 +5738,7 @@
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
@@ -5807,6 +5841,8 @@
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/closure.c",
"src/core/lib/iomgr/closure.h",
+ "src/core/lib/iomgr/combiner.c",
+ "src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 898e96dde4..5d2c48d7eb 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -282,6 +282,27 @@
"posix",
"windows"
],
+ "cpu_cost": 30,
+ "exclude_configs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "combiner_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
@@ -684,6 +705,27 @@
"posix",
"windows"
],
+ "cpu_cost": 30,
+ "exclude_configs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "gpr_mpscq_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln
index 7232440ab7..c28ad2fc31 100644
--- a/vsprojects/buildtests_c.sln
+++ b/vsprojects/buildtests_c.sln
@@ -164,6 +164,17 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "chttp2_varint_test", "vcxpr
{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
EndProjectSection
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "combiner_test", "vcxproj\test\combiner_test\combiner_test.vcxproj", "{C237D1E4-8825-80BA-1FC3-5E147E53E96E}"
+ ProjectSection(myProperties) = preProject
+ lib = "False"
+ EndProjectSection
+ ProjectSection(ProjectDependencies) = postProject
+ {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}
+ {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
+ {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+ EndProjectSection
+EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "compression_test", "vcxproj\test\compression_test\compression_test.vcxproj", "{5AFE7D17-A4A7-D68E-4491-CBC852F9D2A0}"
ProjectSection(myProperties) = preProject
lib = "False"
@@ -366,6 +377,15 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_log_test", "vcxproj\tes
{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
EndProjectSection
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_mpscq_test", "vcxproj\test\gpr_mpscq_test\gpr_mpscq_test.vcxproj", "{B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}"
+ ProjectSection(myProperties) = preProject
+ lib = "False"
+ EndProjectSection
+ ProjectSection(ProjectDependencies) = postProject
+ {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+ EndProjectSection
+EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_slice_buffer_test", "vcxproj\test\gpr_slice_buffer_test\gpr_slice_buffer_test.vcxproj", "{E679773D-DE89-AEBB-9787-59019989B825}"
ProjectSection(myProperties) = preProject
lib = "False"
@@ -1679,6 +1699,22 @@ Global
{6B29F634-1277-74B8-47F6-78756190BA7B}.Release-DLL|Win32.Build.0 = Release|Win32
{6B29F634-1277-74B8-47F6-78756190BA7B}.Release-DLL|x64.ActiveCfg = Release|x64
{6B29F634-1277-74B8-47F6-78756190BA7B}.Release-DLL|x64.Build.0 = Release|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug|x64.ActiveCfg = Debug|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release|Win32.ActiveCfg = Release|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release|x64.ActiveCfg = Release|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug|Win32.Build.0 = Debug|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug|x64.Build.0 = Debug|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release|Win32.Build.0 = Release|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release|x64.Build.0 = Release|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug-DLL|Win32.Build.0 = Debug|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug-DLL|x64.ActiveCfg = Debug|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Debug-DLL|x64.Build.0 = Debug|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release-DLL|Win32.ActiveCfg = Release|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release-DLL|Win32.Build.0 = Release|Win32
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release-DLL|x64.ActiveCfg = Release|x64
+ {C237D1E4-8825-80BA-1FC3-5E147E53E96E}.Release-DLL|x64.Build.0 = Release|x64
{5AFE7D17-A4A7-D68E-4491-CBC852F9D2A0}.Debug|Win32.ActiveCfg = Debug|Win32
{5AFE7D17-A4A7-D68E-4491-CBC852F9D2A0}.Debug|x64.ActiveCfg = Debug|x64
{5AFE7D17-A4A7-D68E-4491-CBC852F9D2A0}.Release|Win32.ActiveCfg = Release|Win32
@@ -2015,6 +2051,22 @@ Global
{38797EE3-62CC-3CBF-18D5-009ED6DD0BEC}.Release-DLL|Win32.Build.0 = Release|Win32
{38797EE3-62CC-3CBF-18D5-009ED6DD0BEC}.Release-DLL|x64.ActiveCfg = Release|x64
{38797EE3-62CC-3CBF-18D5-009ED6DD0BEC}.Release-DLL|x64.Build.0 = Release|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug|Win32.ActiveCfg = Debug|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug|x64.ActiveCfg = Debug|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release|Win32.ActiveCfg = Release|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release|x64.ActiveCfg = Release|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug|Win32.Build.0 = Debug|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug|x64.Build.0 = Debug|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release|Win32.Build.0 = Release|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release|x64.Build.0 = Release|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug-DLL|Win32.Build.0 = Debug|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug-DLL|x64.ActiveCfg = Debug|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Debug-DLL|x64.Build.0 = Debug|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release-DLL|Win32.ActiveCfg = Release|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release-DLL|Win32.Build.0 = Release|Win32
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release-DLL|x64.ActiveCfg = Release|x64
+ {B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}.Release-DLL|x64.Build.0 = Release|x64
{E679773D-DE89-AEBB-9787-59019989B825}.Debug|Win32.ActiveCfg = Debug|Win32
{E679773D-DE89-AEBB-9787-59019989B825}.Debug|x64.ActiveCfg = Debug|x64
{E679773D-DE89-AEBB-9787-59019989B825}.Release|Win32.ActiveCfg = Release|Win32
diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj
index db8594e745..f565d80b72 100644
--- a/vsprojects/vcxproj/gpr/gpr.vcxproj
+++ b/vsprojects/vcxproj/gpr/gpr.vcxproj
@@ -195,6 +195,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\backoff.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\block_annotate.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\env.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\mpscq.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string.h" />
@@ -244,6 +245,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\log_windows.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\mpscq.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\slice.c">
diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
index 9bab373513..0dc13e3e99 100644
--- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
+++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
@@ -61,6 +61,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\log_windows.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\mpscq.c">
+ <Filter>src\core\lib\support</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
@@ -278,6 +281,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\env.h">
<Filter>src\core\lib\support</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\mpscq.h">
+ <Filter>src\core\lib\support</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.h">
<Filter>src\core\lib\support</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index a9e96dab46..e44299007e 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -314,6 +314,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\http\httpcli.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\http\parser.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h" />
@@ -486,6 +487,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_posix.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index be77e53f45..82a8ffb40a 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -46,6 +46,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -698,6 +701,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index 1cfe06aec6..b993c9bdf7 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -303,6 +303,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\http\httpcli.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\http\parser.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h" />
@@ -453,6 +454,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_posix.c">
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index c33c6650e7..4b0a7a01cd 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -49,6 +49,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -605,6 +608,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\combiner.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj b/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj
new file mode 100644
index 0000000000..f889d21e2d
--- /dev/null
+++ b/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{C237D1E4-8825-80BA-1FC3-5E147E53E96E}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>combiner_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>combiner_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\iomgr\combiner_test.c">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj">
+ <Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+ <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+ <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj.filters b/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj.filters
new file mode 100644
index 0000000000..e8ebb09898
--- /dev/null
+++ b/vsprojects/vcxproj/test/combiner_test/combiner_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\iomgr\combiner_test.c">
+ <Filter>test\core\iomgr</Filter>
+ </ClCompile>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{82bca2af-d499-b405-fd05-4d345372496c}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core">
+ <UniqueIdentifier>{c32d8e20-b719-532d-ba23-bd9d523fac15}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\iomgr">
+ <UniqueIdentifier>{b4fa8ca1-e6c7-dec5-6d62-8a62396825c6}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj b/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj
new file mode 100644
index 0000000000..01342868b0
--- /dev/null
+++ b/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{B3D7760B-8BEA-2EF6-F1D4-9F9020E166D6}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>gpr_mpscq_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>gpr_mpscq_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\support\mpscq_test.c">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+ <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj.filters b/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj.filters
new file mode 100644
index 0000000000..9cceb9f3e2
--- /dev/null
+++ b/vsprojects/vcxproj/test/gpr_mpscq_test/gpr_mpscq_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\support\mpscq_test.c">
+ <Filter>test\core\support</Filter>
+ </ClCompile>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{e82fb80c-10ba-2959-55d6-8653715f1e4f}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core">
+ <UniqueIdentifier>{32120233-25e6-f3e4-f828-c6408d47ec04}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\support">
+ <UniqueIdentifier>{aa3a22bc-229a-c00a-dd4a-924c818c6a49}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+