aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD2
-rw-r--r--CMakeLists.txt31
-rw-r--r--INSTALL.md4
-rw-r--r--Makefile37
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml11
-rw-r--r--config.m41
-rw-r--r--config.w321
-rw-r--r--doc/interop-test-descriptions.md33
-rw-r--r--examples/cpp/helloworld/README.md2
-rw-r--r--examples/node/README.md2
-rw-r--r--examples/objective-c/helloworld/README.md2
-rw-r--r--examples/php/README.md2
-rw-r--r--gRPC-Core.podspec3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--include/grpc++/alarm.h2
-rw-r--r--include/grpc++/channel.h12
-rw-r--r--include/grpc++/impl/codegen/async_stream.h443
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h66
-rw-r--r--include/grpc++/impl/codegen/call.h10
-rw-r--r--include/grpc++/impl/codegen/call_hook.h2
-rw-r--r--include/grpc++/impl/codegen/channel_interface.h34
-rw-r--r--include/grpc++/impl/codegen/client_context.h28
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h4
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h56
-rw-r--r--include/grpc++/impl/codegen/completion_queue_tag.h2
-rw-r--r--include/grpc++/impl/codegen/metadata_map.h2
-rw-r--r--include/grpc++/impl/codegen/method_handler_impl.h2
-rw-r--r--include/grpc++/impl/codegen/rpc_method.h3
-rw-r--r--include/grpc++/impl/codegen/rpc_service_method.h3
-rw-r--r--include/grpc++/impl/codegen/server_context.h27
-rw-r--r--include/grpc++/impl/codegen/server_interface.h40
-rw-r--r--include/grpc++/impl/codegen/service_type.h46
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h398
-rw-r--r--include/grpc++/impl/codegen/time.h6
-rw-r--r--include/grpc++/server.h3
-rw-r--r--include/grpc++/server_builder.h6
-rw-r--r--package.xml2
-rw-r--r--src/compiler/cpp_generator.cc199
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c3
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c1
-rw-r--r--src/core/lib/iomgr/tcp_uv.c2
-rw-r--r--src/core/lib/support/mpscq.c25
-rw-r--r--src/core/lib/support/mpscq.h26
-rw-r--r--src/core/lib/support/stack_lockfree.c137
-rw-r--r--src/core/lib/support/stack_lockfree.h38
-rw-r--r--src/core/lib/surface/server.c112
-rw-r--r--src/core/tsi/ssl_transport_security.c10
-rw-r--r--src/cpp/README.md2
-rw-r--r--src/cpp/client/channel_cc.cc12
-rw-r--r--src/cpp/client/generic_stub.cc6
-rw-r--r--src/cpp/common/completion_queue_cc.cc2
-rw-r--r--src/cpp/server/health/default_health_check_service.cc9
-rw-r--r--src/cpp/server/health/default_health_check_service.h2
-rw-r--r--src/cpp/server/server_cc.cc69
-rw-r--r--src/cpp/server/server_context.cc8
-rw-r--r--src/php/README.md4
-rw-r--r--src/python/grpcio/README.rst2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi1
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--test/core/support/BUILD10
-rw-r--r--test/core/support/stack_lockfree_test.c140
-rw-r--r--test/cpp/codegen/codegen_test_full.cc4
-rw-r--r--test/cpp/codegen/compiler_test_golden17
-rw-r--r--test/cpp/microbenchmarks/bm_cq.cc2
-rw-r--r--test/cpp/qps/server_sync.cc8
-rw-r--r--tools/distrib/python/grpcio_tools/README.rst2
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--tools/run_tests/generated/sources_and_headers.json18
-rw-r--r--tools/run_tests/generated/tests.json22
-rw-r--r--vsprojects/buildtests_c.sln25
-rw-r--r--vsprojects/vcxproj/gpr/gpr.vcxproj3
-rw-r--r--vsprojects/vcxproj/gpr/gpr.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj193
-rw-r--r--vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters21
76 files changed, 1697 insertions, 782 deletions
diff --git a/BUILD b/BUILD
index fa42326787..495bb668eb 100644
--- a/BUILD
+++ b/BUILD
@@ -335,6 +335,7 @@ grpc_cc_library(
"src/core/lib/support/log_windows.c",
"src/core/lib/support/mpscq.c",
"src/core/lib/support/murmur_hash.c",
+ "src/core/lib/support/stack_lockfree.c",
"src/core/lib/support/string.c",
"src/core/lib/support/string_posix.c",
"src/core/lib/support/string_util_windows.c",
@@ -370,6 +371,7 @@ grpc_cc_library(
"src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/spinlock.h",
+ "src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.h",
"src/core/lib/support/string_windows.h",
"src/core/lib/support/thd_internal.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6f4869d022..b535d7f546 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -444,6 +444,7 @@ add_dependencies(buildtests_c gpr_host_port_test)
add_dependencies(buildtests_c gpr_log_test)
add_dependencies(buildtests_c gpr_mpscq_test)
add_dependencies(buildtests_c gpr_spinlock_test)
+add_dependencies(buildtests_c gpr_stack_lockfree_test)
add_dependencies(buildtests_c gpr_string_test)
add_dependencies(buildtests_c gpr_sync_test)
add_dependencies(buildtests_c gpr_thd_test)
@@ -789,6 +790,7 @@ add_library(gpr
src/core/lib/support/log_windows.c
src/core/lib/support/mpscq.c
src/core/lib/support/murmur_hash.c
+ src/core/lib/support/stack_lockfree.c
src/core/lib/support/string.c
src/core/lib/support/string_posix.c
src/core/lib/support/string_util_windows.c
@@ -6016,6 +6018,35 @@ target_link_libraries(gpr_spinlock_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(gpr_stack_lockfree_test
+ test/core/support/stack_lockfree_test.c
+)
+
+
+target_include_directories(gpr_stack_lockfree_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+)
+
+target_link_libraries(gpr_stack_lockfree_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ gpr_test_util
+ gpr
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(gpr_string_test
test/core/support/string_test.c
)
diff --git a/INSTALL.md b/INSTALL.md
index 9526a8637b..5ae02f22e7 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -77,7 +77,7 @@ For developers who are interested to contribute, here is how to compile the
gRPC C Core library.
```sh
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
$ make
@@ -95,7 +95,7 @@ on experience with the tools involved.
Builds gRPC C and C++ with boringssl.
- Install [CMake](https://cmake.org/download/).
-- Install [Active State Perl](http://www.activestate.com/activeperl/) (`choco install activeperl`)
+- Install [Active State Perl](https://www.activestate.com/activeperl/) (`choco install activeperl`)
- Install [Ninja](https://ninja-build.org/) (`choco install ninja`)
- Install [Go](https://golang.org/dl/) (`choco install golang`)
- Install [yasm](http://yasm.tortall.net/) and add it to `PATH` (`choco install yasm`)
diff --git a/Makefile b/Makefile
index b0b29c8d79..23f8706e9f 100644
--- a/Makefile
+++ b/Makefile
@@ -995,6 +995,7 @@ gpr_host_port_test: $(BINDIR)/$(CONFIG)/gpr_host_port_test
gpr_log_test: $(BINDIR)/$(CONFIG)/gpr_log_test
gpr_mpscq_test: $(BINDIR)/$(CONFIG)/gpr_mpscq_test
gpr_spinlock_test: $(BINDIR)/$(CONFIG)/gpr_spinlock_test
+gpr_stack_lockfree_test: $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test
gpr_string_test: $(BINDIR)/$(CONFIG)/gpr_string_test
gpr_sync_test: $(BINDIR)/$(CONFIG)/gpr_sync_test
gpr_thd_test: $(BINDIR)/$(CONFIG)/gpr_thd_test
@@ -1376,6 +1377,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/gpr_log_test \
$(BINDIR)/$(CONFIG)/gpr_mpscq_test \
$(BINDIR)/$(CONFIG)/gpr_spinlock_test \
+ $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test \
$(BINDIR)/$(CONFIG)/gpr_string_test \
$(BINDIR)/$(CONFIG)/gpr_sync_test \
$(BINDIR)/$(CONFIG)/gpr_thd_test \
@@ -1803,6 +1805,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/gpr_mpscq_test || ( echo test gpr_mpscq_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_spinlock_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_spinlock_test || ( echo test gpr_spinlock_test failed ; exit 1 )
+ $(E) "[RUN] Testing gpr_stack_lockfree_test"
+ $(Q) $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test || ( echo test gpr_stack_lockfree_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_string_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_string_test || ( echo test gpr_string_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_sync_test"
@@ -2748,6 +2752,7 @@ LIBGPR_SRC = \
src/core/lib/support/log_windows.c \
src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
+ src/core/lib/support/stack_lockfree.c \
src/core/lib/support/string.c \
src/core/lib/support/string_posix.c \
src/core/lib/support/string_util_windows.c \
@@ -9692,6 +9697,38 @@ endif
endif
+GPR_STACK_LOCKFREE_TEST_SRC = \
+ test/core/support/stack_lockfree_test.c \
+
+GPR_STACK_LOCKFREE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GPR_STACK_LOCKFREE_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test: $(GPR_STACK_LOCKFREE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(GPR_STACK_LOCKFREE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/support/stack_lockfree_test.o: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_gpr_stack_lockfree_test: $(GPR_STACK_LOCKFREE_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GPR_STACK_LOCKFREE_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
GPR_STRING_TEST_SRC = \
test/core/support/string_test.c \
diff --git a/binding.gyp b/binding.gyp
index d85cf0be14..70dfd102b1 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -604,6 +604,7 @@
'src/core/lib/support/log_windows.c',
'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
+ 'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_windows.c',
diff --git a/build.yaml b/build.yaml
index 940ca8902d..e55c4ca301 100644
--- a/build.yaml
+++ b/build.yaml
@@ -99,6 +99,7 @@ filegroups:
- src/core/lib/support/mpscq.h
- src/core/lib/support/murmur_hash.h
- src/core/lib/support/spinlock.h
+ - src/core/lib/support/stack_lockfree.h
- src/core/lib/support/string.h
- src/core/lib/support/string_windows.h
- src/core/lib/support/thd_internal.h
@@ -129,6 +130,7 @@ filegroups:
- src/core/lib/support/log_windows.c
- src/core/lib/support/mpscq.c
- src/core/lib/support/murmur_hash.c
+ - src/core/lib/support/stack_lockfree.c
- src/core/lib/support/string.c
- src/core/lib/support/string_posix.c
- src/core/lib/support/string_util_windows.c
@@ -2121,6 +2123,15 @@ targets:
deps:
- gpr_test_util
- gpr
+- name: gpr_stack_lockfree_test
+ cpu_cost: 7
+ build: test
+ language: c
+ src:
+ - test/core/support/stack_lockfree_test.c
+ deps:
+ - gpr_test_util
+ - gpr
- name: gpr_string_test
build: test
language: c
diff --git a/config.m4 b/config.m4
index eb0df98acd..d3a5dc440e 100644
--- a/config.m4
+++ b/config.m4
@@ -63,6 +63,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/support/log_windows.c \
src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
+ src/core/lib/support/stack_lockfree.c \
src/core/lib/support/string.c \
src/core/lib/support/string_posix.c \
src/core/lib/support/string_util_windows.c \
diff --git a/config.w32 b/config.w32
index cca3604045..a1f6d03b87 100644
--- a/config.w32
+++ b/config.w32
@@ -40,6 +40,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\support\\log_windows.c " +
"src\\core\\lib\\support\\mpscq.c " +
"src\\core\\lib\\support\\murmur_hash.c " +
+ "src\\core\\lib\\support\\stack_lockfree.c " +
"src\\core\\lib\\support\\string.c " +
"src\\core\\lib\\support\\string_posix.c " +
"src\\core\\lib\\support\\string_util_windows.c " +
diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md
index b040621f88..0ee2cae2bd 100644
--- a/doc/interop-test-descriptions.md
+++ b/doc/interop-test-descriptions.md
@@ -183,7 +183,8 @@ the `response_compressed` boolean.
Whether compression was actually performed is determined by the compression bit
in the response's message flags. *Note that some languages may not have access
-to the message flags*.
+to the message flags, in which case the client will be unable to verify that
+the `response_compressed` boolean is obeyed by the server*.
Server features:
@@ -218,10 +219,10 @@ Procedure:
```
Client asserts:
* call was successful
- * when `response_compressed` is true, the response MUST have the
- compressed message flag set.
- * when `response_compressed` is false, the response MUST NOT have
- the compressed message flag set.
+ * if supported by the implementation, when `response_compressed` is true,
+ the response MUST have the compressed message flag set.
+ * if supported by the implementation, when `response_compressed` is false,
+ the response MUST NOT have the compressed message flag set.
* response payload body is 314159 bytes in size in both cases.
* clients are free to assert that the response payload body contents are
zero and comparing the entire response message against a golden response
@@ -304,8 +305,8 @@ Procedure:
}
}
```
- If the call fails with `INVALID_ARGUMENT`, the test fails. Otherwise, we
- continue.
+ If the call does not fail with `INVALID_ARGUMENT`, the test fails.
+ Otherwise, we continue.
1. Client calls `StreamingInputCall` again, sending the *compressed* message
@@ -377,7 +378,13 @@ Client asserts:
### server_compressed_streaming
This test verifies that the server can compress streaming messages and disable
-compression on individual messages.
+compression on individual messages, expecting the server's response to be
+compressed or not according to the `response_compressed` boolean.
+
+Whether compression was actually performed is determined by the compression bit
+in the response's message flags. *Note that some languages may not have access
+to the message flags, in which case the client will be unable to verify that the
+`response_compressed` boolean is obeyed by the server*.
Server features:
* [StreamingOutputCall][]
@@ -407,15 +414,14 @@ Procedure:
Client asserts:
* call was successful
* exactly two responses
- * when `response_compressed` is false, the response's messages MUST
- NOT have the compressed message flag set.
- * when `response_compressed` is true, the response's messages MUST
- have the compressed message flag set.
+ * if supported by the implementation, when `response_compressed` is false,
+ the response's messages MUST NOT have the compressed message flag set.
+ * if supported by the implementation, when `response_compressed` is true,
+ the response's messages MUST have the compressed message flag set.
* response payload bodies are sized (in order): 31415, 92653
* clients are free to assert that the response payload body contents are
zero and comparing the entire response messages against golden responses
-
### ping_pong
This test verifies that full duplex bidi is supported.
@@ -1095,4 +1101,3 @@ Discussion:
Ideally, this would be communicated via metadata and not in the
request/response, but we want to use this test in code paths that don't yet
fully communicate metadata.
-
diff --git a/examples/cpp/helloworld/README.md b/examples/cpp/helloworld/README.md
index db953f5362..18d3d79dcc 100644
--- a/examples/cpp/helloworld/README.md
+++ b/examples/cpp/helloworld/README.md
@@ -12,7 +12,7 @@ following command:
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
```
Change your current directory to examples/cpp/helloworld
diff --git a/examples/node/README.md b/examples/node/README.md
index f29236c62a..4730766d59 100644
--- a/examples/node/README.md
+++ b/examples/node/README.md
@@ -12,7 +12,7 @@ INSTALL
```sh
$ # Get the gRPC repository
$ export REPO_ROOT=grpc # REPO root can be any directory of your choice
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
$ cd $REPO_ROOT
$ cd examples/node
diff --git a/examples/objective-c/helloworld/README.md b/examples/objective-c/helloworld/README.md
index 365bea1422..27c9f14040 100644
--- a/examples/objective-c/helloworld/README.md
+++ b/examples/objective-c/helloworld/README.md
@@ -16,7 +16,7 @@ this repository to your local machine by running the following commands:
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
```
diff --git a/examples/php/README.md b/examples/php/README.md
index 54cc97d8c2..e497fb07ff 100644
--- a/examples/php/README.md
+++ b/examples/php/README.md
@@ -17,7 +17,7 @@ INSTALL
- Clone this repository
```sh
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
```
- Install composer
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 2095c0f529..c7afbf2d0a 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -192,6 +192,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/spinlock.h',
+ 'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
'src/core/lib/support/string_windows.h',
'src/core/lib/support/thd_internal.h',
@@ -221,6 +222,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/log_windows.c',
'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
+ 'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_windows.c',
@@ -713,6 +715,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h',
'src/core/lib/support/spinlock.h',
+ 'src/core/lib/support/stack_lockfree.h',
'src/core/lib/support/string.h',
'src/core/lib/support/string_windows.h',
'src/core/lib/support/thd_internal.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 18ef37a077..663915b75e 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -92,6 +92,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/mpscq.h )
s.files += %w( src/core/lib/support/murmur_hash.h )
s.files += %w( src/core/lib/support/spinlock.h )
+ s.files += %w( src/core/lib/support/stack_lockfree.h )
s.files += %w( src/core/lib/support/string.h )
s.files += %w( src/core/lib/support/string_windows.h )
s.files += %w( src/core/lib/support/thd_internal.h )
@@ -121,6 +122,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/log_windows.c )
s.files += %w( src/core/lib/support/mpscq.c )
s.files += %w( src/core/lib/support/murmur_hash.c )
+ s.files += %w( src/core/lib/support/stack_lockfree.c )
s.files += %w( src/core/lib/support/string.c )
s.files += %w( src/core/lib/support/string_posix.c )
s.files += %w( src/core/lib/support/string_util_windows.c )
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h
index ed8dacbc94..00a9306d9d 100644
--- a/include/grpc++/alarm.h
+++ b/include/grpc++/alarm.h
@@ -77,7 +77,7 @@ class Alarm : private GrpcLibraryCodegen {
void Cancel() { grpc_alarm_cancel(alarm_); }
private:
- class AlarmEntry : public CompletionQueueTag {
+ class AlarmEntry : public internal::CompletionQueueTag {
public:
AlarmEntry(void* tag) : tag_(tag) {}
bool FinalizeResult(void** tag, bool* status) override {
diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h
index c50091d6ac..5ba3c591f0 100644
--- a/include/grpc++/channel.h
+++ b/include/grpc++/channel.h
@@ -32,7 +32,7 @@ struct grpc_channel;
namespace grpc {
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
- public CallHook,
+ public internal::CallHook,
public std::enable_shared_from_this<Channel>,
private GrpcLibraryCodegen {
public:
@@ -52,7 +52,7 @@ class Channel final : public ChannelInterface,
private:
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
+ const internal::RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
@@ -60,9 +60,11 @@ class Channel final : public ChannelInterface,
const grpc::string& host, grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);
- Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) override;
- void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;
+ internal::Call CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) override;
+ void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) override;
void* RegisterMethod(const char* method) override;
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 9cf7ac30dd..ddbf3e655e 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -30,6 +30,7 @@ namespace grpc {
class CompletionQueue;
+namespace internal {
/// Common interface for all client side asynchronous streaming.
class ClientAsyncStreamingInterface {
public:
@@ -146,9 +147,41 @@ class AsyncWriterInterface {
}
};
+} // namespace internal
+
template <class R>
-class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
- public AsyncReaderInterface<R> {};
+class ClientAsyncReaderInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncReaderInterface<R> {};
+
+/// Common interface for client side asynchronous writing.
+template <class W>
+class ClientAsyncWriterInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W> {
+ public:
+ /// Signal the client is done with the writes (half-close the client stream).
+ /// Thread-safe with respect to \a AsyncReaderInterface::Read
+ ///
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WritesDone(void* tag) = 0;
+};
+
+/// Async client-side interface for bi-directional streaming,
+/// where the client-to-server message stream has messages of type \a W,
+/// and the server-to-client message stream has messages of type \a R.
+template <class W, class R>
+class ClientAsyncReaderWriterInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W>,
+ public internal::AsyncReaderInterface<R> {
+ public:
+ /// Signal the client is done with the writes (half-close the client stream).
+ /// Thread-safe with respect to \a AsyncReaderInterface::Read
+ ///
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WritesDone(void* tag) = 0;
+};
/// Async client-side API for doing server-streaming RPCs,
/// where the incoming message stream coming from the server has
@@ -156,21 +189,24 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
template <class R>
class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
public:
- /// Create a stream and write the first request out.
- /// \a tag will be notified on \a cq when the call has been started and
- /// \a request has been written out.
- /// Note that \a context will be used to fill in custom initial metadata
- /// used to send to the server when starting the call.
- template <class W>
- static ClientAsyncReader* Create(ChannelInterface* channel,
- CompletionQueue* cq, const RpcMethod& method,
- ClientContext* context, const W& request,
- void* tag) {
- Call call = channel->CreateCall(method, context, cq);
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncReader)))
- ClientAsyncReader(call, context, request, tag);
- }
+ struct internal {
+ /// Create a stream and write the first request out.
+ /// \a tag will be notified on \a cq when the call has been started and
+ /// \a request has been written out.
+ /// Note that \a context will be used to fill in custom initial metadata
+ /// used to send to the server when starting the call.
+ template <class W>
+ static ClientAsyncReader* Create(::grpc::ChannelInterface* channel,
+ CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request,
+ void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncReader)))
+ ClientAsyncReader(call, context, request, tag);
+ }
+ };
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -218,8 +254,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
private:
template <class W>
- ClientAsyncReader(Call call, ClientContext* context, const W& request,
- void* tag)
+ ClientAsyncReader(::grpc::internal::Call call, ClientContext* context,
+ const W& request, void* tag)
: context_(context), call_(call) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
@@ -231,24 +267,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
}
ClientContext* context_;
- Call call_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::Call call_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
init_ops_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
-};
-
-/// Common interface for client side asynchronous writing.
-template <class W>
-class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
- public AsyncWriterInterface<W> {
- public:
- /// Signal the client is done with the writes (half-close the client stream).
- /// Thread-safe with respect to \a AsyncReaderInterface::Read
- ///
- /// \param[in] tag The tag identifying the operation.
- virtual void WritesDone(void* tag) = 0;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ finish_ops_;
};
/// Async API on the client side for doing client-streaming RPCs,
@@ -257,24 +288,27 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
public:
- /// Create a stream and write the first request out.
- /// \a tag will be notified on \a cq when the call has been started (i.e.
- /// intitial metadata sent) and \a request has been written out.
- /// Note that \a context will be used to fill in custom initial metadata
- /// used to send to the server when starting the call.
- /// \a response will be filled in with the single expected response
- /// message from the server upon a successful call to the \a Finish
- /// method of this instance.
- template <class R>
- static ClientAsyncWriter* Create(ChannelInterface* channel,
- CompletionQueue* cq, const RpcMethod& method,
- ClientContext* context, R* response,
- void* tag) {
- Call call = channel->CreateCall(method, context, cq);
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncWriter)))
- ClientAsyncWriter(call, context, response, tag);
- }
+ struct internal {
+ /// Create a stream and write the first request out.
+ /// \a tag will be notified on \a cq when the call has been started (i.e.
+ /// intitial metadata sent) and \a request has been written out.
+ /// Note that \a context will be used to fill in custom initial metadata
+ /// used to send to the server when starting the call.
+ /// \a response will be filled in with the single expected response
+ /// message from the server upon a successful call to the \a Finish
+ /// method of this instance.
+ template <class R>
+ static ClientAsyncWriter* Create(::grpc::ChannelInterface* channel,
+ CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response,
+ void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncWriter)))
+ ClientAsyncWriter(call, context, response, tag);
+ }
+ };
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -338,7 +372,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private:
template <class R>
- ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag)
+ ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context,
+ R* response, void* tag)
: context_(context), call_(call) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@@ -356,31 +391,20 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
}
ClientContext* context_;
- Call call_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::Call call_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
write_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpGenericRecvMessage,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_ops_;
};
/// Async client-side interface for bi-directional streaming,
-/// where the client-to-server message stream has messages of type \a W,
-/// and the server-to-client message stream has messages of type \a R.
-template <class W, class R>
-class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
- public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
- public:
- /// Signal the client is done with the writes (half-close the client stream).
- /// Thread-safe with respect to \a AsyncReaderInterface::Read
- ///
- /// \param[in] tag The tag identifying the operation.
- virtual void WritesDone(void* tag) = 0;
-};
-
-/// Async client-side interface for bi-directional streaming,
/// where the outgoing message stream going to the server
/// has messages of type \a W, and the incoming message stream coming
/// from the server has messages of type \a R.
@@ -388,21 +412,23 @@ template <class W, class R>
class ClientAsyncReaderWriter final
: public ClientAsyncReaderWriterInterface<W, R> {
public:
- /// Create a stream and write the first request out.
- /// \a tag will be notified on \a cq when the call has been started (i.e.
- /// intitial metadata sent).
- /// Note that \a context will be used to fill in custom initial metadata
- /// used to send to the server when starting the call.
- static ClientAsyncReaderWriter* Create(ChannelInterface* channel,
- CompletionQueue* cq,
- const RpcMethod& method,
- ClientContext* context, void* tag) {
- Call call = channel->CreateCall(method, context, cq);
-
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncReaderWriter)))
- ClientAsyncReaderWriter(call, context, tag);
- }
+ struct internal {
+ /// Create a stream and write the first request out.
+ /// \a tag will be notified on \a cq when the call has been started (i.e.
+ /// intitial metadata sent).
+ /// Note that \a context will be used to fill in custom initial metadata
+ /// used to send to the server when starting the call.
+ static ClientAsyncReaderWriter* Create(
+ ::grpc::ChannelInterface* channel, CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method, ClientContext* context,
+ void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
+
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncReaderWriter)))
+ ClientAsyncReaderWriter(call, context, tag);
+ }
+ };
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -471,7 +497,8 @@ class ClientAsyncReaderWriter final
}
private:
- ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag)
+ ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context,
+ void* tag)
: context_(context), call_(call) {
if (context_->initial_metadata_corked_) {
// if corked bit is set in context, we buffer up the initial metadata to
@@ -487,17 +514,25 @@ class ClientAsyncReaderWriter final
}
ClientContext* context_;
- Call call_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::Call call_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
write_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ finish_ops_;
};
template <class W, class R>
-class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncReaderInterface<R> {
public:
/// Indicate that the stream is to be finished with a certain status code
/// and also send out \a msg response to the client.
@@ -541,6 +576,89 @@ class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
virtual void FinishWithError(const Status& status, void* tag) = 0;
};
+template <class W>
+class ServerAsyncWriterInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W> {
+ public:
+ /// Indicate that the stream is to be finished with a certain status code.
+ /// Request notification for when the server has sent the appropriate
+ /// signals to the client to end the call.
+ /// Should not be used concurrently with other operations.
+ ///
+ /// It is appropriate to call this method when either:
+ /// * all messages from the client have been received (either known
+ /// implictly, or explicitly because a previous \a
+ /// AsyncReaderInterface::Read operation with a non-ok
+ /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
+ /// * it is desired to end the call early with some non-OK status code.
+ ///
+ /// This operation will end when the server has finished sending out initial
+ /// metadata (if not sent already), response message, and status, or if
+ /// some failure occurred when trying to do so.
+ ///
+ /// \param[in] tag Tag identifying this request.
+ /// \param[in] status To be sent to the client as the result of this call.
+ virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with
+ /// identifying tag \a tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish
+ /// in a single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
+};
+
+/// Server-side interface for asynchronous bi-directional streaming.
+template <class W, class R>
+class ServerAsyncReaderWriterInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W>,
+ public internal::AsyncReaderInterface<R> {
+ public:
+ /// Indicate that the stream is to be finished with a certain status code.
+ /// Request notification for when the server has sent the appropriate
+ /// signals to the client to end the call.
+ /// Should not be used concurrently with other operations.
+ ///
+ /// It is appropriate to call this method when either:
+ /// * all messages from the client have been received (either known
+ /// implictly, or explicitly because a previous \a
+ /// AsyncReaderInterface::Read operation
+ /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
+ /// with 'false'.
+ /// * it is desired to end the call early with some non-OK status code.
+ ///
+ /// This operation will end when the server has finished sending out initial
+ /// metadata (if not sent already), response message, and status, or if some
+ /// failure occurred when trying to do so.
+ ///
+ /// \param[in] tag Tag identifying this request.
+ /// \param[in] status To be sent to the client as the result of this call.
+ virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with
+ /// identifying tag \a tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
+};
+
/// Async server-side API for doing client-streaming RPCs,
/// where the incoming message stream from the client has messages of type \a R,
/// and the single response message sent from the server is type \a W.
@@ -624,56 +742,19 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
finish_ops_;
};
-template <class W>
-class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W> {
- public:
- /// Indicate that the stream is to be finished with a certain status code.
- /// Request notification for when the server has sent the appropriate
- /// signals to the client to end the call.
- /// Should not be used concurrently with other operations.
- ///
- /// It is appropriate to call this method when either:
- /// * all messages from the client have been received (either known
- /// implictly, or explicitly because a previous \a
- /// AsyncReaderInterface::Read operation with a non-ok
- /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
- /// * it is desired to end the call early with some non-OK status code.
- ///
- /// This operation will end when the server has finished sending out initial
- /// metadata (if not sent already), response message, and status, or if
- /// some failure occurred when trying to do so.
- ///
- /// \param[in] tag Tag identifying this request.
- /// \param[in] status To be sent to the client as the result of this call.
- virtual void Finish(const Status& status, void* tag) = 0;
-
- /// Request the writing of \a msg and coalesce it with trailing metadata which
- /// contains \a status, using WriteOptions options with
- /// identifying tag \a tag.
- ///
- /// WriteAndFinish is equivalent of performing WriteLast and Finish
- /// in a single step.
- ///
- /// \param[in] msg The message to be written.
- /// \param[in] options The WriteOptions to be used to write this message.
- /// \param[in] status The Status that server returns to client.
- /// \param[in] tag The tag identifying the operation.
- virtual void WriteAndFinish(const W& msg, WriteOptions options,
- const Status& status, void* tag) = 0;
-};
-
/// Async server-side API for doing server streaming RPCs,
/// where the outgoing message stream from the server has messages of type \a W.
template <class W>
@@ -755,7 +836,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
template <class T>
void EnsureInitialMetadataSent(T* ops) {
@@ -769,55 +850,17 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
}
}
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
-};
-
-/// Server-side interface for asynchronous bi-directional streaming.
-template <class W, class R>
-class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
- public:
- /// Indicate that the stream is to be finished with a certain status code.
- /// Request notification for when the server has sent the appropriate
- /// signals to the client to end the call.
- /// Should not be used concurrently with other operations.
- ///
- /// It is appropriate to call this method when either:
- /// * all messages from the client have been received (either known
- /// implictly, or explicitly because a previous \a
- /// AsyncReaderInterface::Read operation
- /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
- /// with 'false'.
- /// * it is desired to end the call early with some non-OK status code.
- ///
- /// This operation will end when the server has finished sending out initial
- /// metadata (if not sent already), response message, and status, or if some
- /// failure occurred when trying to do so.
- ///
- /// \param[in] tag Tag identifying this request.
- /// \param[in] status To be sent to the client as the result of this call.
- virtual void Finish(const Status& status, void* tag) = 0;
-
- /// Request the writing of \a msg and coalesce it with trailing metadata which
- /// contains \a status, using WriteOptions options with
- /// identifying tag \a tag.
- ///
- /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
- /// single step.
- ///
- /// \param[in] msg The message to be written.
- /// \param[in] options The WriteOptions to be used to write this message.
- /// \param[in] status The Status that server returns to client.
- /// \param[in] tag The tag identifying the operation.
- virtual void WriteAndFinish(const W& msg, WriteOptions options,
- const Status& status, void* tag) = 0;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
};
/// Async server-side API for doing bidirectional streaming RPCs,
@@ -912,7 +955,7 @@ class ServerAsyncReaderWriter final
private:
friend class ::grpc::Server;
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
template <class T>
void EnsureInitialMetadataSent(T* ops) {
@@ -926,14 +969,18 @@ class ServerAsyncReaderWriter final
}
}
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 41b3ae3f28..45a8e8ee6a 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -75,17 +75,18 @@ class ClientAsyncResponseReader final
/// intitial metadata sent) and \a request has been written out.
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
- template <class W>
- static ClientAsyncResponseReader* Create(ChannelInterface* channel,
- CompletionQueue* cq,
- const RpcMethod& method,
- ClientContext* context,
- const W& request) {
- Call call = channel->CreateCall(method, context, cq);
- return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncResponseReader)))
- ClientAsyncResponseReader(call, context, request);
- }
+ struct internal {
+ template <class W>
+ static ClientAsyncResponseReader* Create(
+ ::grpc::ChannelInterface* channel, CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method, ClientContext* context,
+ const W& request) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncResponseReader)))
+ ClientAsyncResponseReader(call, context, request);
+ }
+ };
/// TODO(vjpai): Delete the below constructor
/// PLEASE DO NOT USE THIS CONSTRUCTOR IN NEW CODE
@@ -94,9 +95,10 @@ class ClientAsyncResponseReader final
/// created this struct rather than properly using a stub.
/// This code will not remain a valid public constructor for long.
template <class W>
- ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- const W& request)
+ ClientAsyncResponseReader(::grpc::ChannelInterface* channel,
+ CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request)
: context_(context),
call_(channel->CreateCall(method, context, cq)),
collection_(std::make_shared<Ops>()) {
@@ -164,10 +166,11 @@ class ClientAsyncResponseReader final
private:
ClientContext* const context_;
- Call call_;
+ ::grpc::internal::Call call_;
template <class W>
- ClientAsyncResponseReader(Call call, ClientContext* context, const W& request)
+ ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context,
+ const W& request)
: context_(context), call_(call) {
ops_.init_buf.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
@@ -183,13 +186,17 @@ class ClientAsyncResponseReader final
// TODO(vjpai): Remove the reference to CallOpSetCollectionInterface
// as soon as the related workaround (public constructor) is deleted
- struct Ops : public CallOpSetCollectionInterface {
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ struct Ops : public ::grpc::internal::CallOpSetCollectionInterface {
+ ::grpc::internal::SneakyCallOpSet<
+ ::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
init_buf;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_buf;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_buf;
} ops_;
@@ -201,7 +208,8 @@ class ClientAsyncResponseReader final
/// Async server-side API for handling unary calls, where the single
/// response message sent to the client is of type \a W.
template <class W>
-class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
+class ServerAsyncResponseWriter final
+ : public internal::ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@@ -289,13 +297,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_buf_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
finish_buf_;
};
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 342ea46203..32bd2ad5c7 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -44,11 +44,13 @@ struct grpc_byte_buffer;
namespace grpc {
class ByteBuffer;
-class Call;
-class CallHook;
class CompletionQueue;
extern CoreCodegenInterface* g_core_codegen_interface;
+namespace internal {
+class Call;
+class CallHook;
+
const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin";
// TODO(yangg) if the map is changed before we send, the pointers will be a
@@ -76,6 +78,7 @@ inline grpc_metadata* FillMetadataArray(
}
return metadata_array;
}
+} // namespace internal
/// Per-message write options.
class WriteOptions {
@@ -191,6 +194,7 @@ class WriteOptions {
bool last_message_;
};
+namespace internal {
/// Default argument for CallOpSet. I is unused by the class, but can be
/// used for generating multiple names for the same thing.
template <int I>
@@ -673,7 +677,7 @@ class Call final {
grpc_call* call_;
int max_receive_message_size_;
};
-
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CALL_H
diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h
index d026cc8b58..44e9de220e 100644
--- a/include/grpc++/impl/codegen/call_hook.h
+++ b/include/grpc++/impl/codegen/call_hook.h
@@ -21,6 +21,7 @@
namespace grpc {
+namespace internal {
class CallOpSetInterface;
class Call;
@@ -31,6 +32,7 @@ class CallHook {
virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h
index 1b7590bf0c..cf1d77e905 100644
--- a/include/grpc++/impl/codegen/channel_interface.h
+++ b/include/grpc++/impl/codegen/channel_interface.h
@@ -24,10 +24,8 @@
#include <grpc/impl/codegen/connectivity_state.h>
namespace grpc {
-class Call;
+class ChannelInterface;
class ClientContext;
-class RpcMethod;
-class CallOpSetInterface;
class CompletionQueue;
template <class R>
@@ -45,6 +43,16 @@ class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
+namespace internal {
+class Call;
+class CallOpSetInterface;
+class RpcMethod;
+template <class InputMessage, class OutputMessage>
+Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
+} // namespace internal
+
/// Codegen interface for \a grpc::Channel.
class ChannelInterface {
public:
@@ -96,15 +104,16 @@ class ChannelInterface {
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
- friend class ::grpc::RpcMethod;
- virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) = 0;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
+ friend Status(::grpc::internal::BlockingUnaryCall)(
+ ChannelInterface* channel, const internal::RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
+ friend class ::grpc::internal::RpcMethod;
+ virtual internal::Call CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) = 0;
+ virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) = 0;
virtual void* RegisterMethod(const char* method) = 0;
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
@@ -112,7 +121,6 @@ class ChannelInterface {
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
};
-
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index c2a44e41ce..22d0069afa 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -60,7 +60,18 @@ class Channel;
class ChannelInterface;
class CompletionQueue;
class CallCredentials;
+class ClientContext;
+
+namespace internal {
class RpcMethod;
+class CallOpClientRecvStatus;
+class CallOpRecvInitialMetadata;
+template <class InputMessage, class OutputMessage>
+Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
+} // namespace internal
+
template <class R>
class ClientReader;
template <class W>
@@ -345,8 +356,8 @@ class ClientContext {
ClientContext& operator=(const ClientContext&);
friend class ::grpc::testing::InteropClientContextInspector;
- friend class CallOpClientRecvStatus;
- friend class CallOpRecvInitialMetadata;
+ friend class ::grpc::internal::CallOpClientRecvStatus;
+ friend class ::grpc::internal::CallOpRecvInitialMetadata;
friend class Channel;
template <class R>
friend class ::grpc::ClientReader;
@@ -363,11 +374,10 @@ class ClientContext {
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
+ friend Status(::grpc::internal::BlockingUnaryCall)(
+ ChannelInterface* channel, const internal::RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
grpc_call* call() const { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
@@ -399,8 +409,8 @@ class ClientContext {
mutable std::shared_ptr<const AuthContext> auth_context_;
struct census_context* census_context_;
std::multimap<grpc::string, grpc::string> send_initial_metadata_;
- MetadataMap recv_initial_metadata_;
- MetadataMap trailing_metadata_;
+ internal::MetadataMap recv_initial_metadata_;
+ internal::MetadataMap trailing_metadata_;
grpc_call* propagate_from_call_;
PropagationOptions propagation_options_;
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
index 7c540fade9..8fef3ab353 100644
--- a/include/grpc++/impl/codegen/client_unary_call.h
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -30,8 +30,9 @@ namespace grpc {
class Channel;
class ClientContext;
class CompletionQueue;
-class RpcMethod;
+namespace internal {
+class RpcMethod;
/// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
@@ -67,6 +68,7 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
return status;
}
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index ca757e2a9c..a04778aa72 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -56,7 +56,19 @@ class ServerWriter;
namespace internal {
template <class W, class R>
class ServerReaderWriterBody;
-}
+} // namespace internal
+
+class Channel;
+class ChannelInterface;
+class ClientContext;
+class CompletionQueue;
+class Server;
+class ServerBuilder;
+class ServerContext;
+
+namespace internal {
+class CompletionQueueTag;
+class RpcMethod;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
@@ -66,16 +78,13 @@ class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
-
-class Channel;
-class ChannelInterface;
-class ClientContext;
-class CompletionQueueTag;
-class CompletionQueue;
-class RpcMethod;
-class Server;
-class ServerBuilder;
-class ServerContext;
+template <class Streamer, bool WriteNeeded>
+class TemplatedBidiStreamingHandler;
+template <class InputMessage, class OutputMessage>
+Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
+} // namespace internal
extern CoreCodegenInterface* g_core_codegen_interface;
@@ -196,28 +205,27 @@ class CompletionQueue : private GrpcLibraryCodegen {
template <class W, class R>
friend class ::grpc::internal::ServerReaderWriterBody;
template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
+ friend class ::grpc::internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ClientStreamingHandler;
+ friend class ::grpc::internal::ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
+ friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
- friend class TemplatedBidiStreamingHandler;
- friend class UnknownMethodHandler;
+ friend class ::grpc::internal::TemplatedBidiStreamingHandler;
+ friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
+ friend Status(::grpc::internal::BlockingUnaryCall)(
+ ChannelInterface* channel, const internal::RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
/// Wraps \a grpc_completion_queue_pluck.
/// \warning Must not be mixed with calls to \a Next.
- bool Pluck(CompletionQueueTag* tag) {
+ bool Pluck(internal::CompletionQueueTag* tag) {
auto deadline =
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
@@ -238,7 +246,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// implementation to simple call the other TryPluck function with a zero
/// timeout. i.e:
/// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME))
- void TryPluck(CompletionQueueTag* tag) {
+ void TryPluck(internal::CompletionQueueTag* tag) {
auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
@@ -254,7 +262,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
///
/// This exects tag->FinalizeResult (if called) to return 'false' i.e expects
/// that the tag is internal not something that is returned to the user.
- void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) {
+ void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) {
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h
index 4d7d3a98dd..cb16bcf9ff 100644
--- a/include/grpc++/impl/codegen/completion_queue_tag.h
+++ b/include/grpc++/impl/codegen/completion_queue_tag.h
@@ -21,6 +21,7 @@
namespace grpc {
+namespace internal {
/// An interface allowing implementors to process and filter event tags.
class CompletionQueueTag {
public:
@@ -31,6 +32,7 @@ class CompletionQueueTag {
/// queue
virtual bool FinalizeResult(void** tag, bool* status) = 0;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h
index b73985967d..fd4750efdd 100644
--- a/include/grpc++/impl/codegen/metadata_map.h
+++ b/include/grpc++/impl/codegen/metadata_map.h
@@ -23,6 +23,7 @@
namespace grpc {
+namespace internal {
class MetadataMap {
public:
MetadataMap() { memset(&arr_, 0, sizeof(arr_)); }
@@ -50,6 +51,7 @@ class MetadataMap {
grpc_metadata_array arr_;
std::multimap<grpc::string_ref, grpc::string_ref> map_;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
index 15e24bdcdc..87e9e5e952 100644
--- a/include/grpc++/impl/codegen/method_handler_impl.h
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -25,6 +25,7 @@
namespace grpc {
+namespace internal {
/// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
@@ -265,6 +266,7 @@ class UnknownMethodHandler : public MethodHandler {
}
};
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h
index ac13ac56c7..54e52364ef 100644
--- a/include/grpc++/impl/codegen/rpc_method.h
+++ b/include/grpc++/impl/codegen/rpc_method.h
@@ -24,7 +24,7 @@
#include <grpc++/impl/codegen/channel_interface.h>
namespace grpc {
-
+namespace internal {
/// Descriptor of an RPC method
class RpcMethod {
public:
@@ -55,6 +55,7 @@ class RpcMethod {
void* const channel_tag_;
};
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H
diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h
index 7165774172..635e40469b 100644
--- a/include/grpc++/impl/codegen/rpc_service_method.h
+++ b/include/grpc++/impl/codegen/rpc_service_method.h
@@ -35,8 +35,8 @@ struct grpc_byte_buffer;
namespace grpc {
class ServerContext;
-class StreamContextInterface;
+namespace internal {
/// Base class for running an RPC handler.
class MethodHandler {
public:
@@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod {
void* server_tag_;
std::unique_ptr<MethodHandler> handler_;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index b5e37fd12b..a2d6967bf8 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -55,7 +55,6 @@ class ServerWriter;
namespace internal {
template <class W, class R>
class ServerReaderWriterBody;
-}
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
@@ -65,9 +64,11 @@ class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
-
+template <class Streamer, bool WriteNeeded>
+class TemplatedBidiStreamingHandler;
class Call;
-class CallOpBuffer;
+} // namespace internal
+
class CompletionQueue;
class Server;
class ServerInterface;
@@ -247,14 +248,14 @@ class ServerContext {
template <class W, class R>
friend class ::grpc::internal::ServerReaderWriterBody;
template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
+ friend class ::grpc::internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ClientStreamingHandler;
+ friend class ::grpc::internal::ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
+ friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
- friend class TemplatedBidiStreamingHandler;
- friend class UnknownMethodHandler;
+ friend class ::grpc::internal::TemplatedBidiStreamingHandler;
+ friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::ClientContext;
/// Prevent copying.
@@ -263,9 +264,9 @@ class ServerContext {
class CompletionOp;
- void BeginCompletionOp(Call* call);
+ void BeginCompletionOp(internal::Call* call);
/// Return the tag queued by BeginCompletionOp()
- CompletionQueueTag* GetCompletionOpTag();
+ internal::CompletionQueueTag* GetCompletionOpTag();
ServerContext(gpr_timespec deadline, grpc_metadata_array* arr);
@@ -282,7 +283,7 @@ class ServerContext {
CompletionQueue* cq_;
bool sent_initial_metadata_;
mutable std::shared_ptr<const AuthContext> auth_context_;
- MetadataMap client_metadata_;
+ internal::MetadataMap client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
@@ -290,7 +291,9 @@ class ServerContext {
grpc_compression_level compression_level_;
grpc_compression_algorithm compression_algorithm_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpSendMessage>
+ pending_ops_;
bool has_pending_ops_;
};
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 87bd085a37..9d120031ca 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -29,20 +29,21 @@ namespace grpc {
class AsyncGenericService;
class GenericServerContext;
-class RpcService;
-class ServerAsyncStreamingInterface;
class ServerCompletionQueue;
class ServerContext;
class ServerCredentials;
class Service;
-class ThreadPoolInterface;
extern CoreCodegenInterface* g_core_codegen_interface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
-class ServerInterface : public CallHook {
+namespace internal {
+class ServerAsyncStreamingInterface;
+} // namespace internal
+
+class ServerInterface : public internal::CallHook {
public:
virtual ~ServerInterface() {}
@@ -77,7 +78,7 @@ class ServerInterface : public CallHook {
virtual void Wait() = 0;
protected:
- friend class Service;
+ friend class ::grpc::Service;
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@@ -115,12 +116,13 @@ class ServerInterface : public CallHook {
virtual grpc_server* server() = 0;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
+ virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) = 0;
- class BaseAsyncRequest : public CompletionQueueTag {
+ class BaseAsyncRequest : public internal::CompletionQueueTag {
public:
BaseAsyncRequest(ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();
@@ -130,7 +132,7 @@ class ServerInterface : public CallHook {
protected:
ServerInterface* const server_;
ServerContext* const context_;
- ServerAsyncStreamingInterface* const stream_;
+ internal::ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
@@ -140,7 +142,7 @@ class ServerInterface : public CallHook {
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
@@ -154,7 +156,7 @@ class ServerInterface : public CallHook {
public:
NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
@@ -169,7 +171,7 @@ class ServerInterface : public CallHook {
public:
PayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
@@ -195,7 +197,7 @@ class ServerInterface : public CallHook {
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
@@ -207,8 +209,9 @@ class ServerInterface : public CallHook {
};
template <class Message>
- void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ void RequestAsyncCall(internal::RpcServiceMethod* method,
+ ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
@@ -218,8 +221,9 @@ class ServerInterface : public CallHook {
message);
}
- void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ void RequestAsyncCall(internal::RpcServiceMethod* method,
+ ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_CODEGEN_ASSERT(method);
@@ -228,7 +232,7 @@ class ServerInterface : public CallHook {
}
void RequestAsyncGenericCall(GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h
index 2dc4ea0ea6..71c3d99d5c 100644
--- a/include/grpc++/impl/codegen/service_type.h
+++ b/include/grpc++/impl/codegen/service_type.h
@@ -28,13 +28,14 @@
namespace grpc {
-class Call;
class CompletionQueue;
class Server;
class ServerInterface;
class ServerCompletionQueue;
class ServerContext;
+namespace internal {
+class Call;
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
@@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface {
virtual void SendInitialMetadata(void* tag) = 0;
private:
- friend class ServerInterface;
+ friend class ::grpc::ServerInterface;
virtual void BindCall(Call* call) = 0;
};
+} // namespace internal
/// Desriptor of an RPC service and its various RPC methods
class Service {
@@ -88,40 +90,38 @@ class Service {
protected:
template <class Message>
void RequestAsyncUnary(int index, ServerContext* context, Message* request,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestAsyncClientStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncClientStreaming(
+ int index, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
template <class Message>
- void RequestAsyncServerStreaming(int index, ServerContext* context,
- Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncServerStreaming(
+ int index, ServerContext* context, Message* request,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestAsyncBidiStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncBidiStreaming(
+ int index, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
- void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
+ void AddMethod(internal::RpcServiceMethod* method) {
+ methods_.emplace_back(method);
+ }
void MarkMethodAsync(int index) {
GPR_CODEGEN_ASSERT(
@@ -139,7 +139,7 @@ class Service {
methods_[index].reset();
}
- void MarkMethodStreamed(int index, MethodHandler* streamed_method) {
+ void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) {
GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() &&
"Cannot mark an async or generic method Streamed");
methods_[index]->SetHandler(streamed_method);
@@ -148,14 +148,14 @@ class Service {
// case of BIDI_STREAMING that has 1 read and 1 write, in that order,
// and split server-side streaming is BIDI_STREAMING with 1 read and
// any number of writes, in that order.
- methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING);
+ methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING);
}
private:
friend class Server;
friend class ServerInterface;
ServerInterface* server_;
- std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
+ std::vector<std::unique_ptr<internal::RpcServiceMethod>> methods_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 3fa208963d..f9c8303000 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -30,6 +30,7 @@
namespace grpc {
+namespace internal {
/// Common interface for all synchronous client side streaming.
class ClientStreamingInterface {
public:
@@ -62,20 +63,6 @@ class ClientStreamingInterface {
virtual Status Finish() = 0;
};
-/// Common interface for all synchronous server side streaming.
-class ServerStreamingInterface {
- public:
- virtual ~ServerStreamingInterface() {}
-
- /// Block to send initial metadata to client.
- /// This call is optional, but if it is used, it cannot be used concurrently
- /// with or after the \a Finish method.
- ///
- /// The initial metadata that will be sent to the client will be
- /// taken from the \a ServerContext associated with the call.
- virtual void SendInitialMetadata() = 0;
-};
-
/// An interface that yields a sequence of messages of type \a R.
template <class R>
class ReaderInterface {
@@ -141,16 +128,55 @@ class WriterInterface {
}
};
+} // namespace internal
+
/// Client-side interface for streaming reads of message of type \a R.
template <class R>
-class ClientReaderInterface : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReaderInterface : public internal::ClientStreamingInterface,
+ public internal::ReaderInterface<R> {
+ public:
+ /// Block to wait for initial metadata from server. The received metadata
+ /// can only be accessed after this call returns. Should only be called before
+ /// the first read. Calling this method is optional, and if it is not called
+ /// the metadata will be available in ClientContext after the first read.
+ virtual void WaitForInitialMetadata() = 0;
+};
+
+/// Client-side interface for streaming writes of message type \a W.
+template <class W>
+class ClientWriterInterface : public internal::ClientStreamingInterface,
+ public internal::WriterInterface<W> {
+ public:
+ /// Half close writing from the client. (signal that the stream of messages
+ /// coming from the clinet is complete).
+ /// Blocks until currently-pending writes are completed.
+ /// Thread safe with respect to \a ReaderInterface::Read operations only
+ ///
+ /// \return Whether the writes were successful.
+ virtual bool WritesDone() = 0;
+};
+
+/// Client-side interface for bi-directional streaming with
+/// client-to-server stream messages of type \a W and
+/// server-to-client stream messages of type \a R.
+template <class W, class R>
+class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
+ public internal::WriterInterface<W>,
+ public internal::ReaderInterface<R> {
public:
/// Block to wait for initial metadata from server. The received metadata
/// can only be accessed after this call returns. Should only be called before
/// the first read. Calling this method is optional, and if it is not called
/// the metadata will be available in ClientContext after the first read.
virtual void WaitForInitialMetadata() = 0;
+
+ /// Half close writing from the client. (signal that the stream of messages
+ /// coming from the clinet is complete).
+ /// Blocks until currently-pending writes are completed.
+ /// Thread-safe with respect to \a ReaderInterface::Read
+ ///
+ /// \return Whether the writes were successful.
+ virtual bool WritesDone() = 0;
};
/// Synchronous (blocking) client-side API for doing server-streaming RPCs,
@@ -159,28 +185,14 @@ class ClientReaderInterface : public ClientStreamingInterface,
template <class R>
class ClientReader final : public ClientReaderInterface<R> {
public:
- /// Block to create a stream and write the initial metadata and \a request
- /// out. Note that \a context will be used to fill in custom initial
- /// metadata used to send to the server when starting the call.
- template <class W>
- ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const W& request)
- : context_(context),
- cq_(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
- call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
- ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- // TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
- ops.ClientSendClose();
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
+ struct internal {
+ template <class W>
+ static ClientReader* Create(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request) {
+ return new ClientReader(channel, method, context, request);
+ }
+ };
/// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
/// semantics.
@@ -192,7 +204,8 @@ class ClientReader final : public ClientReaderInterface<R> {
void WaitForInitialMetadata() override {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); /// status ignored
@@ -209,7 +222,9 @@ class ClientReader final : public ClientReaderInterface<R> {
/// already received (if initial metadata is received, it can be then
/// accessed through the \a ClientContext associated with this call).
bool Read(R* msg) override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -224,7 +239,7 @@ class ClientReader final : public ClientReaderInterface<R> {
/// The \a ClientContext associated with this call is updated with
/// possible metadata received from the server.
Status Finish() override {
- CallOpSet<CallOpClientRecvStatus> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
@@ -235,53 +250,48 @@ class ClientReader final : public ClientReaderInterface<R> {
private:
ClientContext* context_;
CompletionQueue cq_;
- Call call_;
-};
+ ::grpc::internal::Call call_;
-/// Client-side interface for streaming writes of message type \a W.
-template <class W>
-class ClientWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W> {
- public:
- /// Half close writing from the client. (signal that the stream of messages
- /// coming from the clinet is complete).
- /// Blocks until currently-pending writes are completed.
- /// Thread safe with respect to \a ReaderInterface::Read operations only
- ///
- /// \return Whether the writes were successful.
- virtual bool WritesDone() = 0;
+ /// Block to create a stream and write the initial metadata and \a request
+ /// out. Note that \a context will be used to fill in custom initial
+ /// metadata used to send to the server when starting the call.
+ template <class W>
+ ClientReader(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request)
+ : context_(context),
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
+ call_(channel->CreateCall(method, context, &cq_)) {
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
};
/// Synchronous (blocking) client-side API for doing client-streaming RPCs,
/// where the outgoing message stream coming from the client has messages of
/// type \a W.
template <class W>
-class ClientWriter : public ClientWriterInterface<W> {
+class ClientWriter final : public ClientWriterInterface<W> {
public:
- /// Block to create a stream (i.e. send request headers and other initial
- /// metadata to the server). Note that \a context will be used to fill
- /// in custom initial metadata. \a response will be filled in with the
- /// single expected response message from the server upon a successful
- /// call to the \a Finish method of this instance.
- template <class R>
- ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, R* response)
- : context_(context),
- cq_(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
- call_(channel->CreateCall(method, context, &cq_)) {
- finish_ops_.RecvMessage(response);
- finish_ops_.AllowNoMessage();
-
- if (!context_->initial_metadata_corked_) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ struct internal {
+ template <class R>
+ static ClientWriter* Create(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response) {
+ return new ClientWriter(channel, method, context, response);
}
- }
+ };
/// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
/// semantics.
@@ -292,7 +302,8 @@ class ClientWriter : public ClientWriterInterface<W> {
void WaitForInitialMetadata() {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
@@ -304,10 +315,11 @@ class ClientWriter : public ClientWriterInterface<W> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call).
- using WriterInterface<W>::Write;
+ using ::grpc::internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
ops;
if (options.is_last_message()) {
@@ -328,7 +340,7 @@ class ClientWriter : public ClientWriterInterface<W> {
}
bool WritesDone() override {
- CallOpSet<CallOpClientSendClose> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
@@ -353,61 +365,55 @@ class ClientWriter : public ClientWriterInterface<W> {
private:
ClientContext* context_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpGenericRecvMessage,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_ops_;
CompletionQueue cq_;
- Call call_;
-};
+ ::grpc::internal::Call call_;
-/// Client-side interface for bi-directional streaming with
-/// client-to-server stream messages of type \a W and
-/// server-to-client stream messages of type \a R.
-template <class W, class R>
-class ClientReaderWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {
- public:
- /// Block to wait for initial metadata from server. The received metadata
- /// can only be accessed after this call returns. Should only be called before
- /// the first read. Calling this method is optional, and if it is not called
- /// the metadata will be available in ClientContext after the first read.
- virtual void WaitForInitialMetadata() = 0;
-
- /// Half close writing from the client. (signal that the stream of messages
- /// coming from the clinet is complete).
- /// Blocks until currently-pending writes are completed.
- /// Thread-safe with respect to \a ReaderInterface::Read
- ///
- /// \return Whether the writes were successful.
- virtual bool WritesDone() = 0;
-};
-
-/// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
-/// where the outgoing message stream coming from the client has messages of
-/// type \a W, and the incoming messages stream coming from the server has
-/// messages of type \a R.
-template <class W, class R>
-class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
- public:
- /// Block to create a stream and write the initial metadata and \a request
- /// out. Note that \a context will be used to fill in custom initial metadata
- /// used to send to the server when starting the call.
- ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context)
+ /// Block to create a stream (i.e. send request headers and other initial
+ /// metadata to the server). Note that \a context will be used to fill
+ /// in custom initial metadata. \a response will be filled in with the
+ /// single expected response message from the server upon a successful
+ /// call to the \a Finish method of this instance.
+ template <class R>
+ ClientWriter(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response)
: context_(context),
cq_(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
+ finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
+
if (!context_->initial_metadata_corked_) {
- CallOpSet<CallOpSendInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ ops;
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
}
+};
+
+/// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
+/// where the outgoing message stream coming from the client has messages of
+/// type \a W, and the incoming messages stream coming from the server has
+/// messages of type \a R.
+template <class W, class R>
+class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
+ public:
+ struct internal {
+ static ClientReaderWriter* Create(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context) {
+ return new ClientReaderWriter(channel, method, context);
+ }
+ };
/// Block waiting to read initial metadata from the server.
/// This call is optional, but if it is used, it cannot be used concurrently
@@ -418,7 +424,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() override {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
@@ -434,7 +441,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// Also receives initial metadata if not already received (updates the \a
/// ClientContext associated with this call in that case).
bool Read(R* msg) override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -448,10 +457,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call to fill in values).
- using WriterInterface<W>::Write;
+ using ::grpc::internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
ops;
if (options.is_last_message()) {
@@ -472,7 +482,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
bool WritesDone() override {
- CallOpSet<CallOpClientSendClose> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
@@ -484,7 +494,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// - the \a ClientContext associated with this call is updated with
/// possible trailing metadata sent from the server.
Status Finish() override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -498,13 +510,61 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
private:
ClientContext* context_;
CompletionQueue cq_;
- Call call_;
+ ::grpc::internal::Call call_;
+
+ /// Block to create a stream and write the initial metadata and \a request
+ /// out. Note that \a context will be used to fill in custom initial metadata
+ /// used to send to the server when starting the call.
+ ClientReaderWriter(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context)
+ : context_(context),
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
+ call_(channel->CreateCall(method, context, &cq_)) {
+ if (!context_->initial_metadata_corked_) {
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+ }
};
+namespace internal {
+/// Common interface for all synchronous server side streaming.
+class ServerStreamingInterface {
+ public:
+ virtual ~ServerStreamingInterface() {}
+
+ /// Block to send initial metadata to client.
+ /// This call is optional, but if it is used, it cannot be used concurrently
+ /// with or after the \a Finish method.
+ ///
+ /// The initial metadata that will be sent to the client will be
+ /// taken from the \a ServerContext associated with the call.
+ virtual void SendInitialMetadata() = 0;
+};
+} // namespace internal
+
/// Server-side interface for streaming reads of message of type \a R.
template <class R>
-class ServerReaderInterface : public ServerStreamingInterface,
- public ReaderInterface<R> {};
+class ServerReaderInterface : public internal::ServerStreamingInterface,
+ public internal::ReaderInterface<R> {};
+
+/// Server-side interface for streaming writes of message of type \a W.
+template <class W>
+class ServerWriterInterface : public internal::ServerStreamingInterface,
+ public internal::WriterInterface<W> {};
+
+/// Server-side interface for bi-directional streaming.
+template <class W, class R>
+class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
+ public internal::WriterInterface<W>,
+ public internal::ReaderInterface<R> {};
/// Synchronous (blocking) server-side API for doing client-streaming RPCs,
/// where the incoming message stream coming from the client has messages of
@@ -512,15 +572,13 @@ class ServerReaderInterface : public ServerStreamingInterface,
template <class R>
class ServerReader final : public ServerReaderInterface<R> {
public:
- ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics. Note that initial metadata will be affected by the
/// \a ServerContext associated with this call.
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata> ops;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@@ -537,21 +595,22 @@ class ServerReader final : public ServerReaderInterface<R> {
}
bool Read(R* msg) override {
- CallOpSet<CallOpRecvMessage<R>> ops;
+ internal::CallOpSet<internal::CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
- Call* const call_;
+ internal::Call* const call_;
ServerContext* const ctx_;
-};
-/// Server-side interface for streaming writes of message of type \a W.
-template <class W>
-class ServerWriterInterface : public ServerStreamingInterface,
- public WriterInterface<W> {};
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class internal::ClientStreamingHandler;
+
+ ServerReader(internal::Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
+};
/// Synchronous (blocking) server-side API for doing for doing a
/// server-streaming RPCs, where the outgoing message stream coming from the
@@ -559,8 +618,6 @@ class ServerWriterInterface : public ServerStreamingInterface,
template <class W>
class ServerWriter final : public ServerWriterInterface<W> {
public:
- ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics.
/// Note that initial metadata will be affected by the
@@ -568,7 +625,7 @@ class ServerWriter final : public ServerWriterInterface<W> {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata> ops;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@@ -584,11 +641,12 @@ class ServerWriter final : public ServerWriterInterface<W> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call to fill in values).
- using WriterInterface<W>::Write;
+ using internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
if (options.is_last_message()) {
options.set_buffer_hint();
}
+
if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
return false;
}
@@ -613,15 +671,15 @@ class ServerWriter final : public ServerWriterInterface<W> {
}
private:
- Call* const call_;
+ internal::Call* const call_;
ServerContext* const ctx_;
-};
-/// Server-side interface for bi-directional streaming.
-template <class W, class R>
-class ServerReaderWriterInterface : public ServerStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {};
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class internal::ServerStreamingHandler;
+
+ ServerWriter(internal::Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
+};
/// Actual implementation of bi-directional streaming
namespace internal {
@@ -688,6 +746,7 @@ class ServerReaderWriterBody final {
Call* const call_;
ServerContext* const ctx_;
};
+
} // namespace internal
/// Synchronous (blocking) server-side API for a bidirectional
@@ -697,8 +756,6 @@ class ServerReaderWriterBody final {
template <class W, class R>
class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
public:
- ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics. Note that initial metadata will be affected by the
/// \a ServerContext associated with this call.
@@ -715,13 +772,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the \a
/// ServerContext associated with this call).
- using WriterInterface<W>::Write;
+ using internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
private:
internal::ServerReaderWriterBody<W, R> body_;
+
+ friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
+ false>;
+ ServerReaderWriter(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx) {}
};
/// A class to represent a flow-controlled unary call. This is something
@@ -736,9 +798,6 @@ template <class RequestType, class ResponseType>
class ServerUnaryStreamer final
: public ServerReaderWriterInterface<ResponseType, RequestType> {
public:
- ServerUnaryStreamer(Call* call, ServerContext* ctx)
- : body_(call, ctx), read_done_(false), write_done_(false) {}
-
/// Block to send initial metadata to client.
/// Implicit input parameter:
/// - the \a ServerContext associated with this call will be used for
@@ -775,7 +834,7 @@ class ServerUnaryStreamer final
/// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- using WriterInterface<ResponseType>::Write;
+ using internal::WriterInterface<ResponseType>::Write;
bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
@@ -788,6 +847,11 @@ class ServerUnaryStreamer final
internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
bool read_done_;
bool write_done_;
+
+ friend class internal::TemplatedBidiStreamingHandler<
+ ServerUnaryStreamer<RequestType, ResponseType>, true>;
+ ServerUnaryStreamer(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx), read_done_(false), write_done_(false) {}
};
/// A class to represent a flow-controlled server-side streaming call.
@@ -799,9 +863,6 @@ template <class RequestType, class ResponseType>
class ServerSplitStreamer final
: public ServerReaderWriterInterface<ResponseType, RequestType> {
public:
- ServerSplitStreamer(Call* call, ServerContext* ctx)
- : body_(call, ctx), read_done_(false) {}
-
/// Block to send initial metadata to client.
/// Implicit input parameter:
/// - the \a ServerContext associated with this call will be used for
@@ -838,7 +899,7 @@ class ServerSplitStreamer final
/// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- using WriterInterface<ResponseType>::Write;
+ using internal::WriterInterface<ResponseType>::Write;
bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
@@ -846,6 +907,11 @@ class ServerSplitStreamer final
private:
internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
bool read_done_;
+
+ friend class internal::TemplatedBidiStreamingHandler<
+ ServerSplitStreamer<RequestType, ResponseType>, false>;
+ ServerSplitStreamer(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx), read_done_(false) {}
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h
index 589deb4f03..d464d6ea13 100644
--- a/include/grpc++/impl/codegen/time.h
+++ b/include/grpc++/impl/codegen/time.h
@@ -19,6 +19,8 @@
#ifndef GRPCXX_IMPL_CODEGEN_TIME_H
#define GRPCXX_IMPL_CODEGEN_TIME_H
+#include <chrono>
+
#include <grpc++/impl/codegen/config.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -59,10 +61,6 @@ class TimePoint<gpr_timespec> {
} // namespace grpc
-#include <chrono>
-
-#include <grpc/impl/codegen/grpc_types.h>
-
namespace grpc {
// from and to should be absolute time.
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index d76a745ac9..baf0ded9ab 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -172,7 +172,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
/// \param num_cqs How many completion queues does \a cqs hold.
void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
- void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;
+ void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) override;
void ShutdownInternal(gpr_timespec deadline) override;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 50f947d4c7..5bd53bd90f 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -40,7 +40,6 @@ namespace grpc {
class AsyncGenericService;
class ResourceQuota;
class CompletionQueue;
-class RpcService;
class Server;
class ServerCompletionQueue;
class ServerCredentials;
@@ -196,10 +195,7 @@ class ServerBuilder {
struct SyncServerSettings {
SyncServerSettings()
- : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
- min_pollers(1),
- max_pollers(2),
- cq_timeout_msec(10000) {}
+ : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;
diff --git a/package.xml b/package.xml
index 10cb2e63ff..cfa45f06d7 100644
--- a/package.xml
+++ b/package.xml
@@ -106,6 +106,7 @@
<file baseinstalldir="/" name="src/core/lib/support/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/spinlock.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string_windows.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/thd_internal.h" role="src" />
@@ -135,6 +136,7 @@
<file baseinstalldir="/" name="src/core/lib/support/log_windows.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/mpscq.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string_util_windows.c" role="src" />
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index b09bf99677..f8c6fda340 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file,
printer->Print(vars, "namespace grpc {\n");
printer->Print(vars, "class CompletionQueue;\n");
printer->Print(vars, "class Channel;\n");
- printer->Print(vars, "class RpcService;\n");
printer->Print(vars, "class ServerCompletionQueue;\n");
printer->Print(vars, "class ServerContext;\n");
printer->Print(vars, "} // namespace grpc\n\n");
@@ -187,19 +186,21 @@ void PrintHeaderClientMethodInterfaces(
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>"
+ "std::unique_ptr< ::grpc::ClientWriterInterface< "
+ "$Request$>>"
" $Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>"
- "($Method$Raw(context, response));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientWriterInterface< $Request$>>"
+ "($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
+ "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< "
+ "$Request$>>"
" Async$Method$(::grpc::ClientContext* context, $Response$* "
"response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
@@ -213,19 +214,21 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>"
+ "std::unique_ptr< ::grpc::ClientReaderInterface< "
+ "$Response$>>"
" $Method$(::grpc::ClientContext* context, const $Request$& request)"
" {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>"
- "($Method$Raw(context, request));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderInterface< $Response$>>"
+ "($Method$Raw(context, request));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> "
+ "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< "
+ "$Response$>> "
"Async$Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
@@ -242,36 +245,37 @@ void PrintHeaderClientMethodInterfaces(
"$Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientReaderWriterInterface< $Request$, $Response$>>("
- "$Method$Raw(context));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderWriterInterface< "
+ "$Request$, $Response$>>("
+ "$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>("
- "Async$Method$Raw(context, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>>("
+ "Async$Method$Raw(context, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
}
} else {
if (method->NoStreaming()) {
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) = 0;\n");
+ printer->Print(*vars,
+ "virtual "
+ "::grpc::ClientAsyncResponseReaderInterface< "
+ "$Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) = 0;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -286,7 +290,8 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw("
+ "virtual ::grpc::ClientReaderInterface< $Response$>* "
+ "$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) = 0;\n");
printer->Print(
*vars,
@@ -451,7 +456,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
- printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n");
+ printer->Print(*vars,
+ "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n");
}
void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
@@ -623,7 +629,7 @@ void PrintHeaderServerMethodStreamedUnary(
printer->Print(*vars,
"WithStreamedUnaryMethod_$Method$() {\n"
" ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::StreamedUnaryHandler< $Request$, "
+ " new ::grpc::internal::StreamedUnaryHandler< $Request$, "
"$Response$>(std::bind"
"(&WithStreamedUnaryMethod_$Method$<BaseClass>::"
"Streamed$Method$, this, std::placeholders::_1, "
@@ -671,15 +677,16 @@ void PrintHeaderServerMethodSplitStreaming(
"{}\n");
printer->Print(" public:\n");
printer->Indent();
- printer->Print(*vars,
- "WithSplitStreamingMethod_$Method$() {\n"
- " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::SplitServerStreamingHandler< $Request$, "
- "$Response$>(std::bind"
- "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
- "Streamed$Method$, this, std::placeholders::_1, "
- "std::placeholders::_2)));\n"
- "}\n");
+ printer->Print(
+ *vars,
+ "WithSplitStreamingMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
+ " new ::grpc::internal::SplitServerStreamingHandler< $Request$, "
+ "$Response$>(std::bind"
+ "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
+ "Streamed$Method$, this, std::placeholders::_1, "
+ "std::placeholders::_2)));\n"
+ "}\n");
printer->Print(*vars,
"~WithSplitStreamingMethod_$Method$() override {\n"
" BaseClassMustBeDerivedFromService(this);\n"
@@ -819,7 +826,8 @@ void PrintHeaderService(grpc_generator::Printer *printer,
" {\n public:\n");
printer->Indent();
printer->Print(
- "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
+ "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& "
+ "channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
@@ -1082,11 +1090,12 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::Status $ns$$Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
- printer->Print(*vars,
- " return ::grpc::BlockingUnaryCall(channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::BlockingUnaryCall(channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request, response);\n"
+ "}\n\n");
printer->Print(
*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
@@ -1095,7 +1104,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars,
" return "
- "::grpc::ClientAsyncResponseReader< $Response$>::Create("
+ "::grpc::ClientAsyncResponseReader< $Response$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request);\n"
@@ -1105,19 +1115,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientWriter< $Request$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter< $Request$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::ClientWriter< $Request$>::internal::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, response);\n"
+ "}\n\n");
printer->Print(*vars,
"::grpc::ClientAsyncWriter< $Request$>* "
"$ns$$Service$::Stub::Async$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
- " return ::grpc::ClientAsyncWriter< $Request$>::Create("
+ " return ::grpc::ClientAsyncWriter< $Request$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, response, tag);\n"
@@ -1128,19 +1140,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReader< $Response$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientReader< $Response$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::ClientReader< $Response$>::internal::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request);\n"
+ "}\n\n");
printer->Print(*vars,
"::grpc::ClientAsyncReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
- " return ::grpc::ClientAsyncReader< $Response$>::Create("
+ " return ::grpc::ClientAsyncReader< $Response$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request, tag);\n"
@@ -1151,8 +1165,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReaderWriter< "
- "$Request$, $Response$>("
+ " return ::grpc::ClientReaderWriter< "
+ "$Request$, $Response$>::internal::Create("
"channel_.get(), "
"rpcmethod_$Method$_, "
"context);\n"
@@ -1162,14 +1176,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(
- *vars,
- " return "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, tag);\n"
- "}\n\n");
+ printer->Print(*vars,
+ " return "
+ "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::"
+ "internal::Create("
+ "channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, tag);\n"
+ "}\n\n");
}
}
@@ -1279,7 +1293,7 @@ void PrintSourceService(grpc_generator::Printer *printer,
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::$StreamingType$, "
+ "::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
@@ -1302,38 +1316,38 @@ void PrintSourceService(grpc_generator::Printer *printer,
if (method->NoStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
+ " ::grpc::internal::RpcMethod::NORMAL_RPC,\n"
+ " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler< "
+ " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n"
+ " new ::grpc::internal::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler< "
+ " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n"
+ " new ::grpc::internal::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler< "
+ " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n"
+ " new ::grpc::internal::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
@@ -1501,7 +1515,8 @@ void PrintMockClientMethods(grpc_generator::Printer *printer,
printer->Print(
*vars,
"MOCK_METHOD3(Async$Method$Raw, "
- "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*"
+ "::grpc::ClientAsyncReaderWriterInterface<$Request$, "
+ "$Response$>*"
"(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, "
"void* tag));\n");
}
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 45de289e45..a98b8e62db 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r,
int retry_status;
uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
- r->port = svc[i][1];
+ r->port = gpr_strdup(svc[i][1]);
retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
r->host, r->port, r->hints);
if (retry_status < 0 || getaddrinfo_cb == NULL) {
@@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
+ gpr_free(r->host);
+ gpr_free(r->port);
gpr_free(r);
uv_freeaddrinfo(res);
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index ab6832932f..2f1d237c07 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect {
static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota);
+ gpr_free(connect->addr_name);
gpr_free(connect);
}
@@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
}
done = (--connect->refs == 0);
if (done) {
+ grpc_exec_ctx_flush(&exec_ctx);
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
GRPC_CLOSURE_SCHED(&exec_ctx, closure, error);
@@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
+ connect->refs = 1;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 2de0ea90e7..2ab836cc34 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) {
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
acceptor);
grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(peer_name_string);
}
}
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 213952d5ec..ff5fd3edc8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -67,6 +67,8 @@ typedef struct {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ gpr_free(tcp->handle);
+ gpr_free(tcp->peer_string);
gpr_free(tcp);
}
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 58c4c435d3..e9f893988d 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
GPR_ASSERT(q->tail == &q->stub);
}
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
- return prev == &q->stub;
}
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
@@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) {
*empty = false;
return NULL;
}
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
- gpr_mpscq_init(&q->queue);
- q->read_lock = GPR_SPINLOCK_INITIALIZER;
-}
-
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
- gpr_mpscq_destroy(&q->queue);
-}
-
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
- return gpr_mpscq_push(&q->queue, n);
-}
-
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
- if (gpr_spinlock_trylock(&q->read_lock)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
- gpr_spinlock_unlock(&q->read_lock);
- return n;
- }
- return NULL;
-}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 2f4739d7f8..daa51768f7 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -22,7 +22,6 @@
#include <grpc/support/atm.h>
#include <stdbool.h>
#include <stddef.h>
-#include "src/core/lib/support/spinlock.h"
// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
@@ -44,34 +43,11 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq *q);
void gpr_mpscq_destroy(gpr_mpscq *q);
// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
-// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty);
-// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
-// only one thread will succeed concurrently
-typedef struct gpr_locked_mpscq {
- gpr_mpscq queue;
- gpr_spinlock read_lock;
-} gpr_locked_mpscq;
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
-// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
-// Pop a node (returns NULL if no node is ready - which doesn't indicate that
-// the queue is empty!!)
-// Thread safe - can be called from multiple threads concurrently
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);
-
#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c
new file mode 100644
index 0000000000..0fb64ed001
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.c
@@ -0,0 +1,137 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/support/stack_lockfree.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+/* The lockfree node structure is a single architecture-level
+ word that allows for an atomic CAS to set it up. */
+struct lockfree_node_contents {
+ /* next thing to look at. Actual index for head, next index otherwise */
+ uint16_t index;
+#ifdef GPR_ARCH_64
+ uint16_t pad;
+ uint32_t aba_ctr;
+#else
+#ifdef GPR_ARCH_32
+ uint16_t aba_ctr;
+#else
+#error Unsupported bit width architecture
+#endif
+#endif
+};
+
+/* Use a union to make sure that these are in the same bits as an atm word */
+typedef union lockfree_node {
+ gpr_atm atm;
+ struct lockfree_node_contents contents;
+} lockfree_node;
+
+/* make sure that entries aligned to 8-bytes */
+#define ENTRY_ALIGNMENT_BITS 3
+/* reserve this entry as invalid */
+#define INVALID_ENTRY_INDEX ((1 << 16) - 1)
+
+struct gpr_stack_lockfree {
+ lockfree_node *entries;
+ lockfree_node head; /* An atomic entry describing curr head */
+};
+
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
+ gpr_stack_lockfree *stack;
+ stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack));
+ /* Since we only allocate 16 bits to represent an entry number,
+ * make sure that we are within the desired range */
+ /* Reserve the highest entry number as a dummy */
+ GPR_ASSERT(entries < INVALID_ENTRY_INDEX);
+ stack->entries = (lockfree_node *)gpr_malloc_aligned(
+ entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS);
+ /* Clear out all entries */
+ memset(stack->entries, 0, entries * sizeof(stack->entries[0]));
+ memset(&stack->head, 0, sizeof(stack->head));
+
+ GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents));
+
+ /* Point the head at reserved dummy entry */
+ stack->head.contents.index = INVALID_ENTRY_INDEX;
+/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */
+#ifdef GPR_ARCH_64
+ stack->head.contents.pad = 0;
+#endif
+ stack->head.contents.aba_ctr = 0;
+ return stack;
+}
+
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) {
+ gpr_free_aligned(stack->entries);
+ gpr_free(stack);
+}
+
+int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
+ lockfree_node head;
+ lockfree_node newhead;
+ lockfree_node curent;
+ lockfree_node newent;
+
+ /* First fill in the entry's index and aba ctr for new head */
+ newhead.contents.index = (uint16_t)entry;
+#ifdef GPR_ARCH_64
+ /* Fill in the pad to avoid confusing memcheck tools */
+ newhead.contents.pad = 0;
+#endif
+
+ /* Also post-increment the aba_ctr */
+ curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newhead.contents.aba_ctr = ++curent.contents.aba_ctr;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm);
+
+ do {
+ /* Atomically get the existing head value for use */
+ head.atm = gpr_atm_no_barrier_load(&(stack->head.atm));
+ /* Point to it */
+ newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newent.contents.index = head.contents.index;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm);
+ } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm));
+ /* Use rel_cas above to make sure that entry index is set properly */
+ return head.contents.index == INVALID_ENTRY_INDEX;
+}
+
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) {
+ lockfree_node head;
+ lockfree_node newhead;
+
+ do {
+ head.atm = gpr_atm_acq_load(&(stack->head.atm));
+ if (head.contents.index == INVALID_ENTRY_INDEX) {
+ return -1;
+ }
+ newhead.atm =
+ gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm));
+
+ } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm));
+
+ return head.contents.index;
+}
diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h
new file mode 100644
index 0000000000..6324211b72
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+
+#include <stddef.h>
+
+typedef struct gpr_stack_lockfree gpr_stack_lockfree;
+
+/* This stack must specify the maximum number of entries to track.
+ The current implementation only allows up to 65534 entries */
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries);
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack);
+
+/* Pass in a valid entry number for the next stack entry */
+/* Returns 1 if this is the first element on the stack, 0 otherwise */
+int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry);
+
+/* Returns -1 on empty or the actual entry number */
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack);
+
+#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 84ddf74ab9..0cd436883a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -32,8 +32,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/mpscq.h"
-#include "src/core/lib/support/spinlock.h"
+#include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);
typedef struct requested_call {
- gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
void *tag;
@@ -162,7 +160,7 @@ struct request_matcher {
grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
- gpr_locked_mpscq *requests_per_cq;
+ gpr_stack_lockfree **requests_per_cq;
};
struct registered_method {
@@ -207,6 +205,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
+ /** requested call backing data */
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, grpc_server *server) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests_per_cq =
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
}
}
static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
- gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
+ gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
@@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server,
request_matcher *rm,
grpc_error *error) {
- requested_call *rc;
+ int request_id;
for (size_t i = 0; i < server->cq_count; i++) {
- /* Here we know:
- 1. no requests are being added (since the server is shut down)
- 2. no other threads are pulling (since the shut down process is single
- threaded)
- So, we can ignore the queue lock and just pop, with the guarantee that a
- NULL returned here truly means that the queue is empty */
- while ((rc = (requested_call *)gpr_mpscq_pop(
- &rm->requests_per_cq[i].queue)) != NULL) {
- fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
+ -1) {
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
+ GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
- gpr_free(req);
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE(return );
}
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
}
@@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
- requested_call *rc =
- (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) {
+ int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) {
continue;
} else {
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ /* TODO(ctiller): expose a channel_arg for this */
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
- request_matcher_init(&server->unregistered_request_matcher, server);
+ request_matcher_init(&server->unregistered_request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
server_ref(server);
@@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
+ int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
+ if (request_id == -1) {
+ /* out of request ids: just fail this one */
+ fail_call(exec_ctx, server, cq_idx, rc,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
+ return GRPC_CALL_OK;
+ }
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
@@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
+ gpr_free(rc);
+ if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
- rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) break;
+ request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
+ server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion);
}
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 37d4f038b7..1fd65928f9 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes,
GPR_ASSERT(*unprotected_bytes_size <= INT_MAX);
read_from_ssl =
SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size);
- if (read_from_ssl == 0) {
- gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly.");
- return TSI_INTERNAL_ERROR;
- }
- if (read_from_ssl < 0) {
+ if (read_from_ssl <= 0) {
read_from_ssl = SSL_get_error(ssl, read_from_ssl);
switch (read_from_ssl) {
- case SSL_ERROR_WANT_READ:
- /* We need more data to finish the frame. */
+ case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */
+ case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */
*unprotected_bytes_size = 0;
return TSI_OK;
case SSL_ERROR_WANT_WRITE:
diff --git a/src/cpp/README.md b/src/cpp/README.md
index e9ef489a7c..77fbee1149 100644
--- a/src/cpp/README.md
+++ b/src/cpp/README.md
@@ -47,7 +47,7 @@ below.
# Build from Source
```sh
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
$ make
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..038eb32e04 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -76,8 +76,9 @@ grpc::string Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
-Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) {
+internal::Call Channel::CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = NULL;
if (kRegistered) {
@@ -109,10 +110,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
}
grpc_census_call_set_context(c_call, context->census_context());
context->set_call(c_call, shared_from_this());
- return Call(c_call, this, cq);
+ return internal::Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -131,7 +133,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
}
namespace {
-class TagSaver final : public CompletionQueueTag {
+class TagSaver final : public internal::CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 66b1ef0e39..e65cb9903f 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -27,9 +27,11 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
- GenericClientAsyncReaderWriter::Create(
+ GenericClientAsyncReaderWriter::internal::Create(
channel_.get(), cq,
- RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
+ internal::RpcMethod(method.c_str(),
+ internal::RpcMethod::BIDI_STREAMING),
+ context, tag));
}
} // namespace grpc
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index f34b0f3d58..000a03277b 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
- auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 815b607032..fc7feb79fe 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -36,11 +36,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service)
: service_(service), method_(nullptr) {
- MethodHandler* handler =
- new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
- method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
- handler);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
AddMethod(method_);
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 09d5cebe98..99d6680c50 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
private:
const DefaultHealthCheckService* const service_;
- RpcServiceMethod* method_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 04abb6fd3e..3bff9999b9 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -86,7 +86,8 @@ class Server::UnimplementedAsyncRequest final
ServerCompletionQueue* const cq_;
};
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse final
: public UnimplementedAsyncResponseOp {
@@ -104,12 +105,12 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
-class ShutdownTag : public CompletionQueueTag {
+class ShutdownTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
};
-class DummyTag : public CompletionQueueTag {
+class DummyTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
*status = true;
@@ -117,15 +118,15 @@ class DummyTag : public CompletionQueueTag {
}
};
-class Server::SyncRequest final : public CompletionQueueTag {
+class Server::SyncRequest final : public internal::CompletionQueueTag {
public:
- SyncRequest(RpcServiceMethod* method, void* tag)
+ SyncRequest(internal::RpcServiceMethod* method, void* tag)
: method_(method),
tag_(tag),
in_flight_(false),
- has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::SERVER_STREAMING),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
@@ -202,14 +203,14 @@ class Server::SyncRequest final : public CompletionQueueTag {
void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
+ method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
cq_.Shutdown();
- CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
/* Ensure the cq_ is shutdown */
@@ -219,15 +220,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
private:
CompletionQueue cq_;
- Call call_;
+ internal::Call call_;
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
};
private:
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
@@ -300,14 +301,15 @@ class Server::SyncRequestThreadManager : public ThreadManager {
// object
}
- void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
sync_requests_.emplace_back(new SyncRequest(method, tag));
}
void AddUnknownSyncMethod() {
if (!sync_requests_.empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ unknown_method_.reset(new internal::RpcServiceMethod(
+ "unknown", internal::RpcMethod::BIDI_STREAMING,
+ new internal::UnknownMethodHandler));
sync_requests_.emplace_back(
new SyncRequest(unknown_method_.get(), nullptr));
}
@@ -344,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
CompletionQueue* server_cq_;
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
- std::unique_ptr<RpcServiceMethod> health_check_;
+ std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
+ std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -421,13 +423,13 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
grpc_server* Server::c_server() { return server_; }
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
- RpcServiceMethod* method) {
+ internal::RpcServiceMethod* method) {
switch (method->method_type()) {
- case RpcMethod::NORMAL_RPC:
- case RpcMethod::SERVER_STREAMING:
+ case internal::RpcMethod::NORMAL_RPC:
+ case internal::RpcMethod::SERVER_STREAMING:
return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
- case RpcMethod::CLIENT_STREAMING:
- case RpcMethod::BIDI_STREAMING:
+ case internal::RpcMethod::CLIENT_STREAMING:
+ case internal::RpcMethod::BIDI_STREAMING:
return GRPC_SRM_PAYLOAD_NONE;
}
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
@@ -448,7 +450,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
- RpcServiceMethod* method = it->get();
+ internal::RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method), 0);
@@ -588,7 +590,8 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -599,8 +602,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag, bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
@@ -622,7 +625,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_receive_message_size());
+ internal::Call call(call_, server_, call_cq_,
+ server_->max_receive_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -637,7 +641,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
@@ -651,7 +656,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
@@ -693,7 +698,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
- UnknownMethodHandler::FillOps(request_->context(), this);
+ internal::UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 4913682f1d..2e55ffbac4 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -37,7 +37,7 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp final : public CallOpSetInterface {
+class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp()
@@ -146,7 +146,7 @@ ServerContext::~ServerContext() {
}
}
-void ServerContext::BeginCompletionOp(Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
@@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
-CompletionQueueTag* ServerContext::GetCompletionOpTag() {
- return static_cast<CompletionQueueTag*>(completion_op_);
+internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
diff --git a/src/php/README.md b/src/php/README.md
index 90c8cb386a..11f99e134c 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -100,7 +100,7 @@ the `composer` and `protoc` binaries. You can find out how to get these
Clone this repository
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
```
Build and install the gRPC C core library
@@ -129,7 +129,7 @@ $ sudo make install
You will need the source code to run tests
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git pull --recurse-submodules && git submodule update --init --recursive
```
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index 28a2714568..f047243f82 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -46,7 +46,7 @@ package named :code:`python-dev`).
::
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
$ cd $REPO_ROOT
$ git submodule update --init
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index a8c69720fd..5950bfa0e6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -518,7 +518,6 @@ cdef extern from "grpc/compression.h":
ctypedef struct grpc_compression_options:
uint32_t enabled_algorithms_bitset
- grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse(
grpc_slice value, grpc_compression_algorithm *algorithm) nogil
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 5819a624f7..ea5bdbae58 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -39,6 +39,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/log_windows.c',
'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
+ 'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_windows.c',
diff --git a/test/core/support/BUILD b/test/core/support/BUILD
index 298eebd9b8..37870d922d 100644
--- a/test/core/support/BUILD
+++ b/test/core/support/BUILD
@@ -127,6 +127,16 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "stack_lockfree_test",
+ srcs = ["stack_lockfree_test.c"],
+ language = "C",
+ deps = [
+ "//:gpr",
+ "//test/core/util:gpr_test_util",
+ ],
+)
+
+grpc_cc_test(
name = "string_test",
srcs = ["string_test.c"],
language = "C",
diff --git a/test/core/support/stack_lockfree_test.c b/test/core/support/stack_lockfree_test.c
new file mode 100644
index 0000000000..4b1f60ce01
--- /dev/null
+++ b/test/core/support/stack_lockfree_test.c
@@ -0,0 +1,140 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/support/stack_lockfree.h"
+
+#include <stdlib.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include "test/core/util/test_config.h"
+
+/* max stack size supported */
+#define MAX_STACK_SIZE 65534
+
+#define MAX_THREADS 32
+
+static void test_serial_sized(size_t size) {
+ gpr_stack_lockfree *stack = gpr_stack_lockfree_create(size);
+ size_t i;
+ size_t j;
+
+ /* First try popping empty */
+ GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1);
+
+ /* Now add one item and check it */
+ gpr_stack_lockfree_push(stack, 3);
+ GPR_ASSERT(gpr_stack_lockfree_pop(stack) == 3);
+ GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1);
+
+ /* Now add repeatedly more items and check them */
+ for (i = 1; i < size; i *= 2) {
+ for (j = 0; j <= i; j++) {
+ GPR_ASSERT(gpr_stack_lockfree_push(stack, (int)j) == (j == 0));
+ }
+ for (j = 0; j <= i; j++) {
+ GPR_ASSERT(gpr_stack_lockfree_pop(stack) == (int)(i - j));
+ }
+ GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1);
+ }
+
+ gpr_stack_lockfree_destroy(stack);
+}
+
+static void test_serial() {
+ size_t i;
+ for (i = 128; i < MAX_STACK_SIZE; i *= 2) {
+ test_serial_sized(i);
+ }
+ test_serial_sized(MAX_STACK_SIZE);
+}
+
+struct test_arg {
+ gpr_stack_lockfree *stack;
+ int stack_size;
+ int nthreads;
+ int rank;
+ int sum;
+};
+
+static void test_mt_body(void *v) {
+ struct test_arg *arg = (struct test_arg *)v;
+ int lo, hi;
+ int i;
+ int res;
+ lo = arg->rank * arg->stack_size / arg->nthreads;
+ hi = (arg->rank + 1) * arg->stack_size / arg->nthreads;
+ for (i = lo; i < hi; i++) {
+ gpr_stack_lockfree_push(arg->stack, i);
+ if ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) {
+ arg->sum += res;
+ }
+ }
+ while ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) {
+ arg->sum += res;
+ }
+}
+
+static void test_mt_sized(size_t size, int nth) {
+ gpr_stack_lockfree *stack;
+ struct test_arg args[MAX_THREADS];
+ gpr_thd_id thds[MAX_THREADS];
+ int sum;
+ int i;
+ gpr_thd_options options = gpr_thd_options_default();
+
+ stack = gpr_stack_lockfree_create(size);
+ for (i = 0; i < nth; i++) {
+ args[i].stack = stack;
+ args[i].stack_size = (int)size;
+ args[i].nthreads = nth;
+ args[i].rank = i;
+ args[i].sum = 0;
+ }
+ gpr_thd_options_set_joinable(&options);
+ for (i = 0; i < nth; i++) {
+ GPR_ASSERT(gpr_thd_new(&thds[i], test_mt_body, &args[i], &options));
+ }
+ sum = 0;
+ for (i = 0; i < nth; i++) {
+ gpr_thd_join(thds[i]);
+ sum = sum + args[i].sum;
+ }
+ GPR_ASSERT((unsigned)sum == ((unsigned)size * (size - 1)) / 2);
+ gpr_stack_lockfree_destroy(stack);
+}
+
+static void test_mt() {
+ size_t size;
+ int nth;
+ for (nth = 1; nth < MAX_THREADS; nth++) {
+ for (size = 128; size < MAX_STACK_SIZE; size *= 2) {
+ test_mt_sized(size, nth);
+ }
+ test_mt_sized(MAX_STACK_SIZE, nth);
+ }
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_serial();
+ test_mt();
+ return 0;
+}
diff --git a/test/cpp/codegen/codegen_test_full.cc b/test/cpp/codegen/codegen_test_full.cc
index 2eacc99d82..98792bde04 100644
--- a/test/cpp/codegen/codegen_test_full.cc
+++ b/test/cpp/codegen/codegen_test_full.cc
@@ -27,8 +27,8 @@ class CodegenTestFull : public ::testing::Test {};
TEST_F(CodegenTestFull, Init) {
grpc::CompletionQueue cq;
- void* tag;
- bool ok;
+ void* tag = nullptr;
+ bool ok = false;
cq.AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
ASSERT_FALSE(ok);
}
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index b43c27f3f7..f8c768831e 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -39,7 +39,6 @@
namespace grpc {
class CompletionQueue;
class Channel;
-class RpcService;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc
@@ -137,10 +136,10 @@ class ServiceA final {
::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
- const ::grpc::RpcMethod rpcmethod_MethodA1_;
- const ::grpc::RpcMethod rpcmethod_MethodA2_;
- const ::grpc::RpcMethod rpcmethod_MethodA3_;
- const ::grpc::RpcMethod rpcmethod_MethodA4_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA2_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA3_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA4_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -320,7 +319,7 @@ class ServiceA final {
public:
WithStreamedUnaryMethod_MethodA1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@@ -341,7 +340,7 @@ class ServiceA final {
public:
WithSplitStreamingMethod_MethodA3() {
::grpc::Service::MarkMethodStreamed(2,
- new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithSplitStreamingMethod_MethodA3() override {
BaseClassMustBeDerivedFromService(this);
@@ -387,7 +386,7 @@ class ServiceB final {
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
- const ::grpc::RpcMethod rpcmethod_MethodB1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodB1_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -444,7 +443,7 @@ class ServiceB final {
public:
WithStreamedUnaryMethod_MethodB1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);
diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc
index 18308a2e16..6bb6ad8018 100644
--- a/test/cpp/microbenchmarks/bm_cq.cc
+++ b/test/cpp/microbenchmarks/bm_cq.cc
@@ -69,7 +69,7 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg,
grpc_cq_completion* completion) {}
-class DummyTag final : public CompletionQueueTag {
+class DummyTag final : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) override { return true; }
};
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 07e41817b3..9954e2c0bf 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -112,8 +112,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
}
private:
- static Status ClientPull(ServerContext* context,
- ReaderInterface<SimpleRequest>* stream,
+ template <class R>
+ static Status ClientPull(ServerContext* context, R* stream,
SimpleResponse* response) {
SimpleRequest request;
while (stream->Read(&request)) {
@@ -126,8 +126,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
}
return Status::OK;
}
- static Status ServerPush(ServerContext* context,
- WriterInterface<SimpleResponse>* stream,
+ template <class W>
+ static Status ServerPush(ServerContext* context, W* stream,
const SimpleResponse& response,
std::function<bool()> done) {
while ((done == nullptr) || !done()) {
diff --git a/tools/distrib/python/grpcio_tools/README.rst b/tools/distrib/python/grpcio_tools/README.rst
index 55521d17bb..fb44cfaf80 100644
--- a/tools/distrib/python/grpcio_tools/README.rst
+++ b/tools/distrib/python/grpcio_tools/README.rst
@@ -53,7 +53,7 @@ GCC-like stuff, but you may end up having a bad time.
::
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
$ cd $REPO_ROOT
$ git submodule update --init
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 63067b3081..766c20f59b 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1304,6 +1304,8 @@ src/core/lib/support/mpscq.h \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/murmur_hash.h \
src/core/lib/support/spinlock.h \
+src/core/lib/support/stack_lockfree.c \
+src/core/lib/support/stack_lockfree.h \
src/core/lib/support/string.c \
src/core/lib/support/string.h \
src/core/lib/support/string_posix.c \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 65a99bf7d6..477a258daa 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -835,6 +835,21 @@
"headers": [],
"is_filegroup": false,
"language": "c",
+ "name": "gpr_stack_lockfree_test",
+ "src": [
+ "test/core/support/stack_lockfree_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c",
"name": "gpr_string_test",
"src": [
"test/core/support/string_test.c"
@@ -7450,6 +7465,7 @@
"src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/spinlock.h",
+ "src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.h",
"src/core/lib/support/string_windows.h",
"src/core/lib/support/thd_internal.h",
@@ -7522,6 +7538,8 @@
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/spinlock.h",
+ "src/core/lib/support/stack_lockfree.c",
+ "src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.c",
"src/core/lib/support/string.h",
"src/core/lib/support/string_posix.c",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index c3ce58dc76..ffd65bc5f1 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -955,6 +955,28 @@
"posix",
"windows"
],
+ "cpu_cost": 7,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "gpr_stack_lockfree_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln
index 3c6e8d8f34..b7696a9965 100644
--- a/vsprojects/buildtests_c.sln
+++ b/vsprojects/buildtests_c.sln
@@ -484,6 +484,15 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_spinlock_test", "vcxpro
{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
EndProjectSection
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_stack_lockfree_test", "vcxproj\test\gpr_stack_lockfree_test\gpr_stack_lockfree_test.vcxproj", "{AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}"
+ ProjectSection(myProperties) = preProject
+ lib = "False"
+ EndProjectSection
+ ProjectSection(ProjectDependencies) = postProject
+ {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+ EndProjectSection
+EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_string_test", "vcxproj\test\gpr_string_test\gpr_string_test.vcxproj", "{B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}"
ProjectSection(myProperties) = preProject
lib = "False"
@@ -2488,6 +2497,22 @@ Global
{D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|Win32.Build.0 = Release|Win32
{D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|x64.ActiveCfg = Release|x64
{D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|x64.Build.0 = Release|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|Win32.ActiveCfg = Debug|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|x64.ActiveCfg = Debug|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|Win32.ActiveCfg = Release|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|x64.ActiveCfg = Release|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|Win32.Build.0 = Debug|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|x64.Build.0 = Debug|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|Win32.Build.0 = Release|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|x64.Build.0 = Release|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|Win32.Build.0 = Debug|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|x64.ActiveCfg = Debug|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|x64.Build.0 = Debug|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|Win32.ActiveCfg = Release|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|Win32.Build.0 = Release|Win32
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|x64.ActiveCfg = Release|x64
+ {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|x64.Build.0 = Release|x64
{B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Debug|Win32.ActiveCfg = Debug|Win32
{B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Debug|x64.ActiveCfg = Debug|x64
{B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Release|Win32.ActiveCfg = Release|Win32
diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj
index 3f0dedd675..7fb81a7fbc 100644
--- a/vsprojects/vcxproj/gpr/gpr.vcxproj
+++ b/vsprojects/vcxproj/gpr/gpr.vcxproj
@@ -198,6 +198,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\mpscq.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\spinlock.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string_windows.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\thd_internal.h" />
@@ -253,6 +254,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string_posix.c">
diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
index f8cccb5c08..27d9d2f38f 100644
--- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
+++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters
@@ -73,6 +73,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c">
+ <Filter>src\core\lib\support</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
@@ -287,6 +290,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\spinlock.h">
<Filter>src\core\lib\support</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.h">
+ <Filter>src\core\lib\support</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string.h">
<Filter>src\core\lib\support</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj
new file mode 100644
index 0000000000..218cff8ba9
--- /dev/null
+++ b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>gpr_stack_lockfree_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>gpr_stack_lockfree_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\support\stack_lockfree_test.c">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+ <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters
new file mode 100644
index 0000000000..b222ab4128
--- /dev/null
+++ b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\support\stack_lockfree_test.c">
+ <Filter>test\core\support</Filter>
+ </ClCompile>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{de41d2bf-c9ce-7f55-6da3-8d3798fd8fe2}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core">
+ <UniqueIdentifier>{4867ad9b-2b88-de6a-a1df-7a733d389df9}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\support">
+ <UniqueIdentifier>{fca98aa0-f0c0-9254-ab22-a2792b4b94f0}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+