diff options
76 files changed, 1697 insertions, 782 deletions
@@ -335,6 +335,7 @@ grpc_cc_library( "src/core/lib/support/log_windows.c", "src/core/lib/support/mpscq.c", "src/core/lib/support/murmur_hash.c", + "src/core/lib/support/stack_lockfree.c", "src/core/lib/support/string.c", "src/core/lib/support/string_posix.c", "src/core/lib/support/string_util_windows.c", @@ -370,6 +371,7 @@ grpc_cc_library( "src/core/lib/support/mpscq.h", "src/core/lib/support/murmur_hash.h", "src/core/lib/support/spinlock.h", + "src/core/lib/support/stack_lockfree.h", "src/core/lib/support/string.h", "src/core/lib/support/string_windows.h", "src/core/lib/support/thd_internal.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f4869d022..b535d7f546 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -444,6 +444,7 @@ add_dependencies(buildtests_c gpr_host_port_test) add_dependencies(buildtests_c gpr_log_test) add_dependencies(buildtests_c gpr_mpscq_test) add_dependencies(buildtests_c gpr_spinlock_test) +add_dependencies(buildtests_c gpr_stack_lockfree_test) add_dependencies(buildtests_c gpr_string_test) add_dependencies(buildtests_c gpr_sync_test) add_dependencies(buildtests_c gpr_thd_test) @@ -789,6 +790,7 @@ add_library(gpr src/core/lib/support/log_windows.c src/core/lib/support/mpscq.c src/core/lib/support/murmur_hash.c + src/core/lib/support/stack_lockfree.c src/core/lib/support/string.c src/core/lib/support/string_posix.c src/core/lib/support/string_util_windows.c @@ -6016,6 +6018,35 @@ target_link_libraries(gpr_spinlock_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(gpr_stack_lockfree_test + test/core/support/stack_lockfree_test.c +) + + +target_include_directories(gpr_stack_lockfree_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CARES_BUILD_INCLUDE_DIR} + PRIVATE ${CARES_INCLUDE_DIR} + PRIVATE ${CARES_PLATFORM_INCLUDE_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include +) + +target_link_libraries(gpr_stack_lockfree_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gpr_test_util + gpr +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(gpr_string_test test/core/support/string_test.c ) diff --git a/INSTALL.md b/INSTALL.md index 9526a8637b..5ae02f22e7 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -77,7 +77,7 @@ For developers who are interested to contribute, here is how to compile the gRPC C Core library. ```sh - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git submodule update --init $ make @@ -95,7 +95,7 @@ on experience with the tools involved. Builds gRPC C and C++ with boringssl. - Install [CMake](https://cmake.org/download/). -- Install [Active State Perl](http://www.activestate.com/activeperl/) (`choco install activeperl`) +- Install [Active State Perl](https://www.activestate.com/activeperl/) (`choco install activeperl`) - Install [Ninja](https://ninja-build.org/) (`choco install ninja`) - Install [Go](https://golang.org/dl/) (`choco install golang`) - Install [yasm](http://yasm.tortall.net/) and add it to `PATH` (`choco install yasm`) @@ -995,6 +995,7 @@ gpr_host_port_test: $(BINDIR)/$(CONFIG)/gpr_host_port_test gpr_log_test: $(BINDIR)/$(CONFIG)/gpr_log_test gpr_mpscq_test: $(BINDIR)/$(CONFIG)/gpr_mpscq_test gpr_spinlock_test: $(BINDIR)/$(CONFIG)/gpr_spinlock_test +gpr_stack_lockfree_test: $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test gpr_string_test: $(BINDIR)/$(CONFIG)/gpr_string_test gpr_sync_test: $(BINDIR)/$(CONFIG)/gpr_sync_test gpr_thd_test: $(BINDIR)/$(CONFIG)/gpr_thd_test @@ -1376,6 +1377,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/gpr_log_test \ $(BINDIR)/$(CONFIG)/gpr_mpscq_test \ $(BINDIR)/$(CONFIG)/gpr_spinlock_test \ + $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test \ $(BINDIR)/$(CONFIG)/gpr_string_test \ $(BINDIR)/$(CONFIG)/gpr_sync_test \ $(BINDIR)/$(CONFIG)/gpr_thd_test \ @@ -1803,6 +1805,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/gpr_mpscq_test || ( echo test gpr_mpscq_test failed ; exit 1 ) $(E) "[RUN] Testing gpr_spinlock_test" $(Q) $(BINDIR)/$(CONFIG)/gpr_spinlock_test || ( echo test gpr_spinlock_test failed ; exit 1 ) + $(E) "[RUN] Testing gpr_stack_lockfree_test" + $(Q) $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test || ( echo test gpr_stack_lockfree_test failed ; exit 1 ) $(E) "[RUN] Testing gpr_string_test" $(Q) $(BINDIR)/$(CONFIG)/gpr_string_test || ( echo test gpr_string_test failed ; exit 1 ) $(E) "[RUN] Testing gpr_sync_test" @@ -2748,6 +2752,7 @@ LIBGPR_SRC = \ src/core/lib/support/log_windows.c \ src/core/lib/support/mpscq.c \ src/core/lib/support/murmur_hash.c \ + src/core/lib/support/stack_lockfree.c \ src/core/lib/support/string.c \ src/core/lib/support/string_posix.c \ src/core/lib/support/string_util_windows.c \ @@ -9692,6 +9697,38 @@ endif endif +GPR_STACK_LOCKFREE_TEST_SRC = \ + test/core/support/stack_lockfree_test.c \ + +GPR_STACK_LOCKFREE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GPR_STACK_LOCKFREE_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test: $(GPR_STACK_LOCKFREE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(GPR_STACK_LOCKFREE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/gpr_stack_lockfree_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/support/stack_lockfree_test.o: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_gpr_stack_lockfree_test: $(GPR_STACK_LOCKFREE_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GPR_STACK_LOCKFREE_TEST_OBJS:.o=.dep) +endif +endif + + GPR_STRING_TEST_SRC = \ test/core/support/string_test.c \ diff --git a/binding.gyp b/binding.gyp index d85cf0be14..70dfd102b1 100644 --- a/binding.gyp +++ b/binding.gyp @@ -604,6 +604,7 @@ 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', + 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', 'src/core/lib/support/string_util_windows.c', diff --git a/build.yaml b/build.yaml index 940ca8902d..e55c4ca301 100644 --- a/build.yaml +++ b/build.yaml @@ -99,6 +99,7 @@ filegroups: - src/core/lib/support/mpscq.h - src/core/lib/support/murmur_hash.h - src/core/lib/support/spinlock.h + - src/core/lib/support/stack_lockfree.h - src/core/lib/support/string.h - src/core/lib/support/string_windows.h - src/core/lib/support/thd_internal.h @@ -129,6 +130,7 @@ filegroups: - src/core/lib/support/log_windows.c - src/core/lib/support/mpscq.c - src/core/lib/support/murmur_hash.c + - src/core/lib/support/stack_lockfree.c - src/core/lib/support/string.c - src/core/lib/support/string_posix.c - src/core/lib/support/string_util_windows.c @@ -2121,6 +2123,15 @@ targets: deps: - gpr_test_util - gpr +- name: gpr_stack_lockfree_test + cpu_cost: 7 + build: test + language: c + src: + - test/core/support/stack_lockfree_test.c + deps: + - gpr_test_util + - gpr - name: gpr_string_test build: test language: c @@ -63,6 +63,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/support/log_windows.c \ src/core/lib/support/mpscq.c \ src/core/lib/support/murmur_hash.c \ + src/core/lib/support/stack_lockfree.c \ src/core/lib/support/string.c \ src/core/lib/support/string_posix.c \ src/core/lib/support/string_util_windows.c \ diff --git a/config.w32 b/config.w32 index cca3604045..a1f6d03b87 100644 --- a/config.w32 +++ b/config.w32 @@ -40,6 +40,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\support\\log_windows.c " + "src\\core\\lib\\support\\mpscq.c " + "src\\core\\lib\\support\\murmur_hash.c " + + "src\\core\\lib\\support\\stack_lockfree.c " + "src\\core\\lib\\support\\string.c " + "src\\core\\lib\\support\\string_posix.c " + "src\\core\\lib\\support\\string_util_windows.c " + diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index b040621f88..0ee2cae2bd 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -183,7 +183,8 @@ the `response_compressed` boolean. Whether compression was actually performed is determined by the compression bit in the response's message flags. *Note that some languages may not have access -to the message flags*. +to the message flags, in which case the client will be unable to verify that +the `response_compressed` boolean is obeyed by the server*. Server features: @@ -218,10 +219,10 @@ Procedure: ``` Client asserts: * call was successful - * when `response_compressed` is true, the response MUST have the - compressed message flag set. - * when `response_compressed` is false, the response MUST NOT have - the compressed message flag set. + * if supported by the implementation, when `response_compressed` is true, + the response MUST have the compressed message flag set. + * if supported by the implementation, when `response_compressed` is false, + the response MUST NOT have the compressed message flag set. * response payload body is 314159 bytes in size in both cases. * clients are free to assert that the response payload body contents are zero and comparing the entire response message against a golden response @@ -304,8 +305,8 @@ Procedure: } } ``` - If the call fails with `INVALID_ARGUMENT`, the test fails. Otherwise, we - continue. + If the call does not fail with `INVALID_ARGUMENT`, the test fails. + Otherwise, we continue. 1. Client calls `StreamingInputCall` again, sending the *compressed* message @@ -377,7 +378,13 @@ Client asserts: ### server_compressed_streaming This test verifies that the server can compress streaming messages and disable -compression on individual messages. +compression on individual messages, expecting the server's response to be +compressed or not according to the `response_compressed` boolean. + +Whether compression was actually performed is determined by the compression bit +in the response's message flags. *Note that some languages may not have access +to the message flags, in which case the client will be unable to verify that the +`response_compressed` boolean is obeyed by the server*. Server features: * [StreamingOutputCall][] @@ -407,15 +414,14 @@ Procedure: Client asserts: * call was successful * exactly two responses - * when `response_compressed` is false, the response's messages MUST - NOT have the compressed message flag set. - * when `response_compressed` is true, the response's messages MUST - have the compressed message flag set. + * if supported by the implementation, when `response_compressed` is false, + the response's messages MUST NOT have the compressed message flag set. + * if supported by the implementation, when `response_compressed` is true, + the response's messages MUST have the compressed message flag set. * response payload bodies are sized (in order): 31415, 92653 * clients are free to assert that the response payload body contents are zero and comparing the entire response messages against golden responses - ### ping_pong This test verifies that full duplex bidi is supported. @@ -1095,4 +1101,3 @@ Discussion: Ideally, this would be communicated via metadata and not in the request/response, but we want to use this test in code paths that don't yet fully communicate metadata. - diff --git a/examples/cpp/helloworld/README.md b/examples/cpp/helloworld/README.md index db953f5362..18d3d79dcc 100644 --- a/examples/cpp/helloworld/README.md +++ b/examples/cpp/helloworld/README.md @@ -12,7 +12,7 @@ following command: ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc ``` Change your current directory to examples/cpp/helloworld diff --git a/examples/node/README.md b/examples/node/README.md index f29236c62a..4730766d59 100644 --- a/examples/node/README.md +++ b/examples/node/README.md @@ -12,7 +12,7 @@ INSTALL ```sh $ # Get the gRPC repository $ export REPO_ROOT=grpc # REPO root can be any directory of your choice - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT $ cd $REPO_ROOT $ cd examples/node diff --git a/examples/objective-c/helloworld/README.md b/examples/objective-c/helloworld/README.md index 365bea1422..27c9f14040 100644 --- a/examples/objective-c/helloworld/README.md +++ b/examples/objective-c/helloworld/README.md @@ -16,7 +16,7 @@ this repository to your local machine by running the following commands: ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git submodule update --init ``` diff --git a/examples/php/README.md b/examples/php/README.md index 54cc97d8c2..e497fb07ff 100644 --- a/examples/php/README.md +++ b/examples/php/README.md @@ -17,7 +17,7 @@ INSTALL - Clone this repository ```sh - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc ``` - Install composer diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2095c0f529..c7afbf2d0a 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -192,6 +192,7 @@ Pod::Spec.new do |s| 'src/core/lib/support/mpscq.h', 'src/core/lib/support/murmur_hash.h', 'src/core/lib/support/spinlock.h', + 'src/core/lib/support/stack_lockfree.h', 'src/core/lib/support/string.h', 'src/core/lib/support/string_windows.h', 'src/core/lib/support/thd_internal.h', @@ -221,6 +222,7 @@ Pod::Spec.new do |s| 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', + 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', 'src/core/lib/support/string_util_windows.c', @@ -713,6 +715,7 @@ Pod::Spec.new do |s| 'src/core/lib/support/mpscq.h', 'src/core/lib/support/murmur_hash.h', 'src/core/lib/support/spinlock.h', + 'src/core/lib/support/stack_lockfree.h', 'src/core/lib/support/string.h', 'src/core/lib/support/string_windows.h', 'src/core/lib/support/thd_internal.h', diff --git a/grpc.gemspec b/grpc.gemspec index 18ef37a077..663915b75e 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -92,6 +92,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/support/mpscq.h ) s.files += %w( src/core/lib/support/murmur_hash.h ) s.files += %w( src/core/lib/support/spinlock.h ) + s.files += %w( src/core/lib/support/stack_lockfree.h ) s.files += %w( src/core/lib/support/string.h ) s.files += %w( src/core/lib/support/string_windows.h ) s.files += %w( src/core/lib/support/thd_internal.h ) @@ -121,6 +122,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/support/log_windows.c ) s.files += %w( src/core/lib/support/mpscq.c ) s.files += %w( src/core/lib/support/murmur_hash.c ) + s.files += %w( src/core/lib/support/stack_lockfree.c ) s.files += %w( src/core/lib/support/string.c ) s.files += %w( src/core/lib/support/string_posix.c ) s.files += %w( src/core/lib/support/string_util_windows.c ) diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index ed8dacbc94..00a9306d9d 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -77,7 +77,7 @@ class Alarm : private GrpcLibraryCodegen { void Cancel() { grpc_alarm_cancel(alarm_); } private: - class AlarmEntry : public CompletionQueueTag { + class AlarmEntry : public internal::CompletionQueueTag { public: AlarmEntry(void* tag) : tag_(tag) {} bool FinalizeResult(void** tag, bool* status) override { diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index c50091d6ac..5ba3c591f0 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -32,7 +32,7 @@ struct grpc_channel; namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, - public CallHook, + public internal::CallHook, public std::enable_shared_from_this<Channel>, private GrpcLibraryCodegen { public: @@ -52,7 +52,7 @@ class Channel final : public ChannelInterface, private: template <class InputMessage, class OutputMessage> friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, + const internal::RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); @@ -60,9 +60,11 @@ class Channel final : public ChannelInterface, const grpc::string& host, grpc_channel* c_channel); Channel(const grpc::string& host, grpc_channel* c_channel); - Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void* RegisterMethod(const char* method) override; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 9cf7ac30dd..ddbf3e655e 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -30,6 +30,7 @@ namespace grpc { class CompletionQueue; +namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { public: @@ -146,9 +147,41 @@ class AsyncWriterInterface { } }; +} // namespace internal + template <class R> -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface<R> {}; +class ClientAsyncReaderInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> {}; + +/// Common interface for client side asynchronous writing. +template <class W> +class ClientAsyncWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +/// Async client-side interface for bi-directional streaming, +/// where the client-to-server message stream has messages of type \a W, +/// and the server-to-client message stream has messages of type \a R. +template <class W, class R> +class ClientAsyncReaderWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has @@ -156,21 +189,24 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, template <class R> class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template <class W> - static ClientAsyncReader* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, const W& request, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template <class W> + static ClientAsyncReader* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -218,8 +254,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { private: template <class W> - ClientAsyncReader(Call call, ClientContext* context, const W& request, - void* tag) + ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, + const W& request, void* tag) : context_(context), call_(call) { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_, @@ -231,24 +267,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; /// Async API on the client side for doing client-streaming RPCs, @@ -257,24 +288,27 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template <class W> class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template <class R> - static ClientAsyncWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, R* response, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template <class R> + static ClientAsyncWriter* Create(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -338,7 +372,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: template <class R> - ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) + ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, + R* response, void* tag) : context_(context), call_(call) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -356,31 +391,20 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } ClientContext* context_; - Call call_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; }; /// Async client-side interface for bi-directional streaming, -/// where the client-to-server message stream has messages of type \a W, -/// and the server-to-client message stream has messages of type \a R. -template <class W, class R> -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -/// Async client-side interface for bi-directional streaming, /// where the outgoing message stream going to the server /// has messages of type \a W, and the incoming message stream coming /// from the server has messages of type \a R. @@ -388,21 +412,23 @@ template <class W, class R> class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface<W, R> { public: - /// Create a stream and write the first request out. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, void* tag) { - Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, tag); - } + struct internal { + /// Create a stream and write the first request out. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -471,7 +497,8 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) + ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, + void* tag) : context_(context), call_(call) { if (context_->initial_metadata_corked_) { // if corked bit is set in context, we buffer up the initial metadata to @@ -487,17 +514,25 @@ class ClientAsyncReaderWriter final } ClientContext* context_; - Call call_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + ::grpc::internal::Call call_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; template <class W, class R> -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface<R> { +class ServerAsyncReaderInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> { public: /// Indicate that the stream is to be finished with a certain status code /// and also send out \a msg response to the client. @@ -541,6 +576,89 @@ class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, virtual void FinishWithError(const Status& status, void* tag) = 0; }; +template <class W> +class ServerAsyncWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation with a non-ok + /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish + /// in a single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template <class W, class R> +class ServerAsyncReaderWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation + /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' + /// with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if some + /// failure occurred when trying to do so. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; +}; + /// Async server-side API for doing client-streaming RPCs, /// where the incoming message stream from the client has messages of type \a R, /// and the single response message sent from the server is type \a W. @@ -624,56 +742,19 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_ops_; }; -template <class W> -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation with a non-ok - /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish - /// in a single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; -}; - /// Async server-side API for doing server streaming RPCs, /// where the outgoing message stream from the server has messages of type \a W. template <class W> @@ -755,7 +836,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -769,55 +850,17 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation - /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' - /// with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if some - /// failure occurred when trying to do so. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish in a - /// single step. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, WriteOptions options, - const Status& status, void* tag) = 0; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; /// Async server-side API for doing bidirectional streaming RPCs, @@ -912,7 +955,7 @@ class ServerAsyncReaderWriter final private: friend class ::grpc::Server; - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template <class T> void EnsureInitialMetadataSent(T* ops) { @@ -926,14 +969,18 @@ class ServerAsyncReaderWriter final } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 41b3ae3f28..45a8e8ee6a 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -75,17 +75,18 @@ class ClientAsyncResponseReader final /// intitial metadata sent) and \a request has been written out. /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. - template <class W> - static ClientAsyncResponseReader* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, - const W& request) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request); - } + struct internal { + template <class W> + static ClientAsyncResponseReader* Create( + ::grpc::ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + const W& request) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); + } + }; /// TODO(vjpai): Delete the below constructor /// PLEASE DO NOT USE THIS CONSTRUCTOR IN NEW CODE @@ -94,9 +95,10 @@ class ClientAsyncResponseReader final /// created this struct rather than properly using a stub. /// This code will not remain a valid public constructor for long. template <class W> - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request) + ClientAsyncResponseReader(::grpc::ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)), collection_(std::make_shared<Ops>()) { @@ -164,10 +166,11 @@ class ClientAsyncResponseReader final private: ClientContext* const context_; - Call call_; + ::grpc::internal::Call call_; template <class W> - ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) + ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, + const W& request) : context_(context), call_(call) { ops_.init_buf.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); @@ -183,13 +186,17 @@ class ClientAsyncResponseReader final // TODO(vjpai): Remove the reference to CallOpSetCollectionInterface // as soon as the related workaround (public constructor) is deleted - struct Ops : public CallOpSetCollectionInterface { - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + struct Ops : public ::grpc::internal::CallOpSetCollectionInterface { + ::grpc::internal::SneakyCallOpSet< + ::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_buf; - CallOpSet<CallOpRecvInitialMetadata> meta_buf; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_buf; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>, + ::grpc::internal::CallOpClientRecvStatus> finish_buf; } ops_; @@ -201,7 +208,8 @@ class ClientAsyncResponseReader final /// Async server-side API for handling unary calls, where the single /// response message sent to the client is of type \a W. template <class W> -class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { +class ServerAsyncResponseWriter final + : public internal::ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -289,13 +297,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_buf_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_buf_; }; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 342ea46203..32bd2ad5c7 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -44,11 +44,13 @@ struct grpc_byte_buffer; namespace grpc { class ByteBuffer; -class Call; -class CallHook; class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; +namespace internal { +class Call; +class CallHook; + const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; // TODO(yangg) if the map is changed before we send, the pointers will be a @@ -76,6 +78,7 @@ inline grpc_metadata* FillMetadataArray( } return metadata_array; } +} // namespace internal /// Per-message write options. class WriteOptions { @@ -191,6 +194,7 @@ class WriteOptions { bool last_message_; }; +namespace internal { /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -673,7 +677,7 @@ class Call final { grpc_call* call_; int max_receive_message_size_; }; - +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CALL_H diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h index d026cc8b58..44e9de220e 100644 --- a/include/grpc++/impl/codegen/call_hook.h +++ b/include/grpc++/impl/codegen/call_hook.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { class CallOpSetInterface; class Call; @@ -31,6 +32,7 @@ class CallHook { virtual ~CallHook() {} virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index 1b7590bf0c..cf1d77e905 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -24,10 +24,8 @@ #include <grpc/impl/codegen/connectivity_state.h> namespace grpc { -class Call; +class ChannelInterface; class ClientContext; -class RpcMethod; -class CallOpSetInterface; class CompletionQueue; template <class R> @@ -45,6 +43,16 @@ class ClientAsyncReaderWriter; template <class R> class ClientAsyncResponseReader; +namespace internal { +class Call; +class CallOpSetInterface; +class RpcMethod; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + /// Codegen interface for \a grpc::Channel. class ChannelInterface { public: @@ -96,15 +104,16 @@ class ChannelInterface { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); - friend class ::grpc::RpcMethod; - virtual Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); + friend class ::grpc::internal::RpcMethod; + virtual internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, @@ -112,7 +121,6 @@ class ChannelInterface { virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; }; - } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index c2a44e41ce..22d0069afa 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -60,7 +60,18 @@ class Channel; class ChannelInterface; class CompletionQueue; class CallCredentials; +class ClientContext; + +namespace internal { class RpcMethod; +class CallOpClientRecvStatus; +class CallOpRecvInitialMetadata; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal + template <class R> class ClientReader; template <class W> @@ -345,8 +356,8 @@ class ClientContext { ClientContext& operator=(const ClientContext&); friend class ::grpc::testing::InteropClientContextInspector; - friend class CallOpClientRecvStatus; - friend class CallOpRecvInitialMetadata; + friend class ::grpc::internal::CallOpClientRecvStatus; + friend class ::grpc::internal::CallOpRecvInitialMetadata; friend class Channel; template <class R> friend class ::grpc::ClientReader; @@ -363,11 +374,10 @@ class ClientContext { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); grpc_call* call() const { return call_; } void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); @@ -399,8 +409,8 @@ class ClientContext { mutable std::shared_ptr<const AuthContext> auth_context_; struct census_context* census_context_; std::multimap<grpc::string, grpc::string> send_initial_metadata_; - MetadataMap recv_initial_metadata_; - MetadataMap trailing_metadata_; + internal::MetadataMap recv_initial_metadata_; + internal::MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 7c540fade9..8fef3ab353 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -30,8 +30,9 @@ namespace grpc { class Channel; class ClientContext; class CompletionQueue; -class RpcMethod; +namespace internal { +class RpcMethod; /// Wrapper that performs a blocking unary call template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, @@ -67,6 +68,7 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, return status; } +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ca757e2a9c..a04778aa72 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -56,7 +56,19 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; -} +} // namespace internal + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class Server; +class ServerBuilder; +class ServerContext; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -66,16 +78,13 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueueTag; -class CompletionQueue; -class RpcMethod; -class Server; -class ServerBuilder; -class ServerContext; +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler; +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); +} // namespace internal extern CoreCodegenInterface* g_core_codegen_interface; @@ -196,28 +205,27 @@ class CompletionQueue : private GrpcLibraryCodegen { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend Status(::grpc::internal::BlockingUnaryCall)( + ChannelInterface* channel, const internal::RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag) { + bool Pluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -238,7 +246,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// implementation to simple call the other TryPluck function with a zero /// timeout. i.e: /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(CompletionQueueTag* tag) { + void TryPluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); @@ -254,7 +262,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects /// that the tag is internal not something that is returned to the user. - void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { + void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h index 4d7d3a98dd..cb16bcf9ff 100644 --- a/include/grpc++/impl/codegen/completion_queue_tag.h +++ b/include/grpc++/impl/codegen/completion_queue_tag.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { /// An interface allowing implementors to process and filter event tags. class CompletionQueueTag { public: @@ -31,6 +32,7 @@ class CompletionQueueTag { /// queue virtual bool FinalizeResult(void** tag, bool* status) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index b73985967d..fd4750efdd 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -23,6 +23,7 @@ namespace grpc { +namespace internal { class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -50,6 +51,7 @@ class MetadataMap { grpc_metadata_array arr_; std::multimap<grpc::string_ref, grpc::string_ref> map_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 15e24bdcdc..87e9e5e952 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -25,6 +25,7 @@ namespace grpc { +namespace internal { /// A wrapper class of an application provided rpc method handler. template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler : public MethodHandler { @@ -265,6 +266,7 @@ class UnknownMethodHandler : public MethodHandler { } }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index ac13ac56c7..54e52364ef 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -24,7 +24,7 @@ #include <grpc++/impl/codegen/channel_interface.h> namespace grpc { - +namespace internal { /// Descriptor of an RPC method class RpcMethod { public: @@ -55,6 +55,7 @@ class RpcMethod { void* const channel_tag_; }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 7165774172..635e40469b 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -35,8 +35,8 @@ struct grpc_byte_buffer; namespace grpc { class ServerContext; -class StreamContextInterface; +namespace internal { /// Base class for running an RPC handler. class MethodHandler { public: @@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod { void* server_tag_; std::unique_ptr<MethodHandler> handler_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index b5e37fd12b..a2d6967bf8 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -55,7 +55,6 @@ class ServerWriter; namespace internal { template <class W, class R> class ServerReaderWriterBody; -} template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -65,9 +64,11 @@ class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; - +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler; class Call; -class CallOpBuffer; +} // namespace internal + class CompletionQueue; class Server; class ServerInterface; @@ -247,14 +248,14 @@ class ServerContext { template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; template <class ServiceType, class RequestType, class ResponseType> - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -263,9 +264,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(Call* call); + void BeginCompletionOp(internal::Call* call); /// Return the tag queued by BeginCompletionOp() - CompletionQueueTag* GetCompletionOpTag(); + internal::CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); @@ -282,7 +283,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr<const AuthContext> auth_context_; - MetadataMap client_metadata_; + internal::MetadataMap client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; @@ -290,7 +291,9 @@ class ServerContext { grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_; + internal::CallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpSendMessage> + pending_ops_; bool has_pending_ops_; }; diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 87bd085a37..9d120031ca 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -29,20 +29,21 @@ namespace grpc { class AsyncGenericService; class GenericServerContext; -class RpcService; -class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; class ServerCredentials; class Service; -class ThreadPoolInterface; extern CoreCodegenInterface* g_core_codegen_interface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class ServerInterface : public CallHook { +namespace internal { +class ServerAsyncStreamingInterface; +} // namespace internal + +class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} @@ -77,7 +78,7 @@ class ServerInterface : public CallHook { virtual void Wait() = 0; protected: - friend class Service; + friend class ::grpc::Service; /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -115,12 +116,13 @@ class ServerInterface : public CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; - class BaseAsyncRequest : public CompletionQueueTag { + class BaseAsyncRequest : public internal::CompletionQueueTag { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); virtual ~BaseAsyncRequest(); @@ -130,7 +132,7 @@ class ServerInterface : public CallHook { protected: ServerInterface* const server_; ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; + internal::ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; const bool delete_on_finalize_; @@ -140,7 +142,7 @@ class ServerInterface : public CallHook { class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -154,7 +156,7 @@ class ServerInterface : public CallHook { public: NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { @@ -169,7 +171,7 @@ class ServerInterface : public CallHook { public: PayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) @@ -195,7 +197,7 @@ class ServerInterface : public CallHook { class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); @@ -207,8 +209,9 @@ class ServerInterface : public CallHook { }; template <class Message> - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { @@ -218,8 +221,9 @@ class ServerInterface : public CallHook { message); } - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); @@ -228,7 +232,7 @@ class ServerInterface : public CallHook { } void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 2dc4ea0ea6..71c3d99d5c 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -28,13 +28,14 @@ namespace grpc { -class Call; class CompletionQueue; class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; +namespace internal { +class Call; class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class ServerInterface; + friend class ::grpc::ServerInterface; virtual void BindCall(Call* call) = 0; }; +} // namespace internal /// Desriptor of an RPC service and its various RPC methods class Service { @@ -88,40 +90,38 @@ class Service { protected: template <class Message> void RequestAsyncUnary(int index, ServerContext* context, Message* request, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncClientStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncClientStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template <class Message> - void RequestAsyncServerStreaming(int index, ServerContext* context, - Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncServerStreaming( + int index, ServerContext* context, Message* request, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncBidiStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncBidiStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } - void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } + void AddMethod(internal::RpcServiceMethod* method) { + methods_.emplace_back(method); + } void MarkMethodAsync(int index) { GPR_CODEGEN_ASSERT( @@ -139,7 +139,7 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamed(int index, MethodHandler* streamed_method) { + void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed"); methods_[index]->SetHandler(streamed_method); @@ -148,14 +148,14 @@ class Service { // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); + methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); } private: friend class Server; friend class ServerInterface; ServerInterface* server_; - std::vector<std::unique_ptr<RpcServiceMethod>> methods_; + std::vector<std::unique_ptr<internal::RpcServiceMethod>> methods_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 3fa208963d..f9c8303000 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -30,6 +30,7 @@ namespace grpc { +namespace internal { /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -62,20 +63,6 @@ class ClientStreamingInterface { virtual Status Finish() = 0; }; -/// Common interface for all synchronous server side streaming. -class ServerStreamingInterface { - public: - virtual ~ServerStreamingInterface() {} - - /// Block to send initial metadata to client. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// The initial metadata that will be sent to the client will be - /// taken from the \a ServerContext associated with the call. - virtual void SendInitialMetadata() = 0; -}; - /// An interface that yields a sequence of messages of type \a R. template <class R> class ReaderInterface { @@ -141,16 +128,55 @@ class WriterInterface { } }; +} // namespace internal + /// Client-side interface for streaming reads of message of type \a R. template <class R> -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before /// the first read. Calling this method is optional, and if it is not called /// the metadata will be available in ClientContext after the first read. virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; }; /// Synchronous (blocking) client-side API for doing server-streaming RPCs, @@ -159,28 +185,14 @@ class ClientReaderInterface : public ClientStreamingInterface, template <class R> class ClientReader final : public ClientReaderInterface<R> { public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template <class W> - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } + struct internal { + template <class W> + static ClientReader* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader(channel, method, context, request); + } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -192,7 +204,8 @@ class ClientReader final : public ClientReaderInterface<R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); /// status ignored @@ -209,7 +222,9 @@ class ClientReader final : public ClientReaderInterface<R> { /// already received (if initial metadata is received, it can be then /// accessed through the \a ClientContext associated with this call). bool Read(R* msg) override { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -224,7 +239,7 @@ class ClientReader final : public ClientReaderInterface<R> { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. Status Finish() override { - CallOpSet<CallOpClientRecvStatus> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); @@ -235,53 +250,48 @@ class ClientReader final : public ClientReaderInterface<R> { private: ClientContext* context_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for streaming writes of message type \a W. -template <class W> -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface<W> { - public: - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread safe with respect to \a ReaderInterface::Read operations only - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } }; /// Synchronous (blocking) client-side API for doing client-streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W. template <class W> -class ClientWriter : public ClientWriterInterface<W> { +class ClientWriter final : public ClientWriterInterface<W> { public: - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template <class R> - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - - if (!context_->initial_metadata_corked_) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + struct internal { + template <class R> + static ClientWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter(channel, method, context, response); } - } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -292,7 +302,8 @@ class ClientWriter : public ClientWriterInterface<W> { void WaitForInitialMetadata() { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -304,10 +315,11 @@ class ClientWriter : public ClientWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call). - using WriterInterface<W>::Write; + using ::grpc::internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -328,7 +340,7 @@ class ClientWriter : public ClientWriterInterface<W> { } bool WritesDone() override { - CallOpSet<CallOpClientSendClose> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -353,61 +365,55 @@ class ClientWriter : public ClientWriterInterface<W> { private: ClientContext* context_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; - Call call_; -}; + ::grpc::internal::Call call_; -/// Client-side interface for bi-directional streaming with -/// client-to-server stream messages of type \a W and -/// server-to-client stream messages of type \a R. -template <class W, class R> -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; - - /// Half close writing from the client. (signal that the stream of messages - /// coming from the clinet is complete). - /// Blocks until currently-pending writes are completed. - /// Thread-safe with respect to \a ReaderInterface::Read - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W, and the incoming messages stream coming from the server has -/// messages of type \a R. -template <class W, class R> -class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { - public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) : context_(context), cq_(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + if (!context_->initial_metadata_corked_) { - CallOpSet<CallOpSendInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } } +}; + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template <class W, class R> +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + struct internal { + static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) { + return new ClientReaderWriter(channel, method, context); + } + }; /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently @@ -418,7 +424,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet<CallOpRecvInitialMetadata> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -434,7 +441,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Also receives initial metadata if not already received (updates the \a /// ClientContext associated with this call in that case). bool Read(R* msg) override { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -448,10 +457,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface<W>::Write; + using ::grpc::internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -472,7 +482,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } bool WritesDone() override { - CallOpSet<CallOpClientSendClose> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -484,7 +494,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// - the \a ClientContext associated with this call is updated with /// possible trailing metadata sent from the server. Status Finish() override { - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -498,13 +510,61 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { private: ClientContext* context_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } }; +namespace internal { +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; +} // namespace internal + /// Server-side interface for streaming reads of message of type \a R. template <class R> -class ServerReaderInterface : public ServerStreamingInterface, - public ReaderInterface<R> {}; +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface<R> {}; + +/// Server-side interface for streaming writes of message of type \a W. +template <class W> +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W> {}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> {}; /// Synchronous (blocking) server-side API for doing client-streaming RPCs, /// where the incoming message stream coming from the client has messages of @@ -512,15 +572,13 @@ class ServerReaderInterface : public ServerStreamingInterface, template <class R> class ServerReader final : public ServerReaderInterface<R> { public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata> ops; + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -537,21 +595,22 @@ class ServerReader final : public ServerReaderInterface<R> { } bool Read(R* msg) override { - CallOpSet<CallOpRecvMessage<R>> ops; + internal::CallOpSet<internal::CallOpRecvMessage<R>> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops) && ops.got_message; } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for streaming writes of message of type \a W. -template <class W> -class ServerWriterInterface : public ServerStreamingInterface, - public WriterInterface<W> {}; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ClientStreamingHandler; + + ServerReader(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Synchronous (blocking) server-side API for doing for doing a /// server-streaming RPCs, where the outgoing message stream coming from the @@ -559,8 +618,6 @@ class ServerWriterInterface : public ServerStreamingInterface, template <class W> class ServerWriter final : public ServerWriterInterface<W> { public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. /// Note that initial metadata will be affected by the @@ -568,7 +625,7 @@ class ServerWriter final : public ServerWriterInterface<W> { void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata> ops; + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -584,11 +641,12 @@ class ServerWriter final : public ServerWriterInterface<W> { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface<W>::Write; + using internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { if (options.is_last_message()) { options.set_buffer_hint(); } + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } @@ -613,15 +671,15 @@ class ServerWriter final : public ServerWriterInterface<W> { } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; -}; -/// Server-side interface for bi-directional streaming. -template <class W, class R> -class ServerReaderWriterInterface : public ServerStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> {}; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + + ServerWriter(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; /// Actual implementation of bi-directional streaming namespace internal { @@ -688,6 +746,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; + } // namespace internal /// Synchronous (blocking) server-side API for a bidirectional @@ -697,8 +756,6 @@ class ServerReaderWriterBody final { template <class W, class R> class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { public: - ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. @@ -715,13 +772,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { /// Side effect: /// Also sends initial metadata if not already sent (using the \a /// ServerContext associated with this call). - using WriterInterface<W>::Write; + using internal::WriterInterface<W>::Write; bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } private: internal::ServerReaderWriterBody<W, R> body_; + + friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, + false>; + ServerReaderWriter(internal::Call* call, ServerContext* ctx) + : body_(call, ctx) {} }; /// A class to represent a flow-controlled unary call. This is something @@ -736,9 +798,6 @@ template <class RequestType, class ResponseType> class ServerUnaryStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: - ServerUnaryStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -775,7 +834,7 @@ class ServerUnaryStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface<ResponseType>::Write; + using internal::WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; @@ -788,6 +847,11 @@ class ServerUnaryStreamer final internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>; + ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} }; /// A class to represent a flow-controlled server-side streaming call. @@ -799,9 +863,6 @@ template <class RequestType, class ResponseType> class ServerSplitStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType> { public: - ServerSplitStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -838,7 +899,7 @@ class ServerSplitStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface<ResponseType>::Write; + using internal::WriterInterface<ResponseType>::Write; bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } @@ -846,6 +907,11 @@ class ServerSplitStreamer final private: internal::ServerReaderWriterBody<ResponseType, RequestType> body_; bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>; + ServerSplitStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h index 589deb4f03..d464d6ea13 100644 --- a/include/grpc++/impl/codegen/time.h +++ b/include/grpc++/impl/codegen/time.h @@ -19,6 +19,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_TIME_H #define GRPCXX_IMPL_CODEGEN_TIME_H +#include <chrono> + #include <grpc++/impl/codegen/config.h> #include <grpc/impl/codegen/grpc_types.h> @@ -59,10 +61,6 @@ class TimePoint<gpr_timespec> { } // namespace grpc -#include <chrono> - -#include <grpc/impl/codegen/grpc_types.h> - namespace grpc { // from and to should be absolute time. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index d76a745ac9..baf0ded9ab 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -172,7 +172,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void ShutdownInternal(gpr_timespec deadline) override; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 50f947d4c7..5bd53bd90f 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -40,7 +40,6 @@ namespace grpc { class AsyncGenericService; class ResourceQuota; class CompletionQueue; -class RpcService; class Server; class ServerCompletionQueue; class ServerCredentials; @@ -196,10 +195,7 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), - min_pollers(1), - max_pollers(2), - cq_timeout_msec(10000) {} + : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} /// Number of server completion queues to create to listen to incoming RPCs. int num_cqs; diff --git a/package.xml b/package.xml index 10cb2e63ff..cfa45f06d7 100644 --- a/package.xml +++ b/package.xml @@ -106,6 +106,7 @@ <file baseinstalldir="/" name="src/core/lib/support/mpscq.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/murmur_hash.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/spinlock.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/string.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/string_windows.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/thd_internal.h" role="src" /> @@ -135,6 +136,7 @@ <file baseinstalldir="/" name="src/core/lib/support/log_windows.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/mpscq.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/murmur_hash.c" role="src" /> + <file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/string.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/string_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/string_util_windows.c" role="src" /> diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b09bf99677..f8c6fda340 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file, printer->Print(vars, "namespace grpc {\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); - printer->Print(vars, "class RpcService;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "class ServerContext;\n"); printer->Print(vars, "} // namespace grpc\n\n"); @@ -187,19 +186,21 @@ void PrintHeaderClientMethodInterfaces( } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>" + "std::unique_ptr< ::grpc::ClientWriterInterface< " + "$Request$>>" " $Method$(" "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>" - "($Method$Raw(context, response));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientWriterInterface< $Request$>>" + "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" + "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< " + "$Request$>>" " Async$Method$(::grpc::ClientContext* context, $Response$* " "response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); @@ -213,19 +214,21 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>" + "std::unique_ptr< ::grpc::ClientReaderInterface< " + "$Response$>>" " $Method$(::grpc::ClientContext* context, const $Request$& request)" " {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>" - "($Method$Raw(context, request));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientReaderInterface< $Response$>>" + "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); printer->Print( *vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " + "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< " + "$Response$>> " "Async$Method$(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); @@ -242,36 +245,37 @@ void PrintHeaderClientMethodInterfaces( "$Request$, $Response$>> " "$Method$(::grpc::ClientContext* context) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientReaderWriterInterface< $Request$, $Response$>>(" - "$Method$Raw(context));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientReaderWriterInterface< " + "$Request$, $Response$>>(" + "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + "std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>> " + "Async$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>>(" + "Async$Method$Raw(context, cq, tag));\n"); printer->Outdent(); printer->Print("}\n"); } } else { if (method->NoStreaming()) { - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) = 0;\n"); + printer->Print(*vars, + "virtual " + "::grpc::ClientAsyncResponseReaderInterface< " + "$Response$>* " + "Async$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) = 0;\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -286,7 +290,8 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" + "virtual ::grpc::ClientReaderInterface< $Response$>* " + "$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); printer->Print( *vars, @@ -451,7 +456,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer, const grpc_generator::Method *method, std::map<grpc::string, grpc::string> *vars) { (*vars)["Method"] = method->name(); - printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n"); + printer->Print(*vars, + "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n"); } void PrintHeaderServerMethodSync(grpc_generator::Printer *printer, @@ -623,7 +629,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Print(*vars, "WithStreamedUnaryMethod_$Method$() {\n" " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::StreamedUnaryHandler< $Request$, " + " new ::grpc::internal::StreamedUnaryHandler< $Request$, " "$Response$>(std::bind" "(&WithStreamedUnaryMethod_$Method$<BaseClass>::" "Streamed$Method$, this, std::placeholders::_1, " @@ -671,15 +677,16 @@ void PrintHeaderServerMethodSplitStreaming( "{}\n"); printer->Print(" public:\n"); printer->Indent(); - printer->Print(*vars, - "WithSplitStreamingMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::SplitServerStreamingHandler< $Request$, " - "$Response$>(std::bind" - "(&WithSplitStreamingMethod_$Method$<BaseClass>::" - "Streamed$Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" - "}\n"); + printer->Print( + *vars, + "WithSplitStreamingMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" + " new ::grpc::internal::SplitServerStreamingHandler< $Request$, " + "$Response$>(std::bind" + "(&WithSplitStreamingMethod_$Method$<BaseClass>::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); printer->Print(*vars, "~WithSplitStreamingMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" @@ -819,7 +826,8 @@ void PrintHeaderService(grpc_generator::Printer *printer, " {\n public:\n"); printer->Indent(); printer->Print( - "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n"); + "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& " + "channel);\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } @@ -1082,11 +1090,12 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::Status $ns$$Service$::Stub::$Method$(" "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); - printer->Print(*vars, - " return ::grpc::BlockingUnaryCall(channel_.get(), " - "rpcmethod_$Method$_, " - "context, request, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::BlockingUnaryCall(channel_.get(), " + "rpcmethod_$Method$_, " + "context, request, response);\n" + "}\n\n"); printer->Print( *vars, "::grpc::ClientAsyncResponseReader< $Response$>* " @@ -1095,7 +1104,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::CompletionQueue* cq) {\n"); printer->Print(*vars, " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" + "::grpc::ClientAsyncResponseReader< $Response$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request);\n" @@ -1105,19 +1115,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientWriter< $Request$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, $Response$* response) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientWriter< $Request$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientWriter< $Request$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, response);\n" + "}\n\n"); printer->Print(*vars, "::grpc::ClientAsyncWriter< $Request$>* " "$ns$$Service$::Stub::Async$Method$Raw(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" + " return ::grpc::ClientAsyncWriter< $Request$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, response, tag);\n" @@ -1128,19 +1140,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReader< $Response$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientReader< $Response$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientReader< $Response$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, request);\n" + "}\n\n"); printer->Print(*vars, "::grpc::ClientAsyncReader< $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" + " return ::grpc::ClientAsyncReader< $Response$>::" + "internal::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, tag);\n" @@ -1151,8 +1165,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n"); printer->Print(*vars, - " return new ::grpc::ClientReaderWriter< " - "$Request$, $Response$>(" + " return ::grpc::ClientReaderWriter< " + "$Request$, $Response$>::internal::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context);\n" @@ -1162,14 +1176,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + printer->Print(*vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::" + "internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, tag);\n" + "}\n\n"); } } @@ -1279,7 +1293,7 @@ void PrintSourceService(grpc_generator::Printer *printer, printer->Print(*vars, ", rpcmethod_$Method$_(" "$prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::$StreamingType$, " + "::grpc::internal::RpcMethod::$StreamingType$, " "channel" ")\n"); } @@ -1302,38 +1316,38 @@ void PrintSourceService(grpc_generator::Printer *printer, if (method->NoStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " + " ::grpc::internal::RpcMethod::NORMAL_RPC,\n" + " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::CLIENT_STREAMING,\n" - " new ::grpc::ClientStreamingHandler< " + " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n" + " new ::grpc::internal::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::SERVER_STREAMING,\n" - " new ::grpc::ServerStreamingHandler< " + " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n" + " new ::grpc::internal::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (method->BidiStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::BIDI_STREAMING,\n" - " new ::grpc::BidiStreamingHandler< " + " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n" + " new ::grpc::internal::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } @@ -1501,7 +1515,8 @@ void PrintMockClientMethods(grpc_generator::Printer *printer, printer->Print( *vars, "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" + "::grpc::ClientAsyncReaderWriterInterface<$Request$, " + "$Response$>*" "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, " "void* tag));\n"); } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 45de289e45..a98b8e62db 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; - r->port = svc[i][1]; + r->port = gpr_strdup(svc[i][1]); retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { @@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); + gpr_free(r->host); + gpr_free(r->port); gpr_free(r); uv_freeaddrinfo(res); } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index ab6832932f..2f1d237c07 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, grpc_uv_tcp_connect *connect) { grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota); + gpr_free(connect->addr_name); gpr_free(connect); } @@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { + grpc_exec_ctx_flush(&exec_ctx); uv_tcp_connect_cleanup(&exec_ctx, connect); } GRPC_CLOSURE_SCHED(&exec_ctx, closure, error); @@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; + connect->refs = 1; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2de0ea90e7..2ab836cc34 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) { sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, acceptor); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(peer_name_string); } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 213952d5ec..ff5fd3edc8 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -67,6 +67,8 @@ typedef struct { static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); + gpr_free(tcp->handle); + gpr_free(tcp->peer_string); gpr_free(tcp); } diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 58c4c435d3..e9f893988d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); - return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { - gpr_mpscq_init(&q->queue); - q->read_lock = GPR_SPINLOCK_INITIALIZER; -} - -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { - gpr_mpscq_destroy(&q->queue); -} - -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { - return gpr_mpscq_push(&q->queue, n); -} - -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { - if (gpr_spinlock_trylock(&q->read_lock)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); - gpr_spinlock_unlock(&q->read_lock); - return n; - } - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 2f4739d7f8..daa51768f7 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -22,7 +22,6 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> -#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -44,34 +43,11 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) -// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); -// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing -// only one thread will succeed concurrently -typedef struct gpr_locked_mpscq { - gpr_mpscq queue; - gpr_spinlock read_lock; -} gpr_locked_mpscq; - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q); -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); -// Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -// Thread safe - can be called from multiple threads concurrently -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); - #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c new file mode 100644 index 0000000000..0fb64ed001 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* The lockfree node structure is a single architecture-level + word that allows for an atomic CAS to set it up. */ +struct lockfree_node_contents { + /* next thing to look at. Actual index for head, next index otherwise */ + uint16_t index; +#ifdef GPR_ARCH_64 + uint16_t pad; + uint32_t aba_ctr; +#else +#ifdef GPR_ARCH_32 + uint16_t aba_ctr; +#else +#error Unsupported bit width architecture +#endif +#endif +}; + +/* Use a union to make sure that these are in the same bits as an atm word */ +typedef union lockfree_node { + gpr_atm atm; + struct lockfree_node_contents contents; +} lockfree_node; + +/* make sure that entries aligned to 8-bytes */ +#define ENTRY_ALIGNMENT_BITS 3 +/* reserve this entry as invalid */ +#define INVALID_ENTRY_INDEX ((1 << 16) - 1) + +struct gpr_stack_lockfree { + lockfree_node *entries; + lockfree_node head; /* An atomic entry describing curr head */ +}; + +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { + gpr_stack_lockfree *stack; + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); + /* Since we only allocate 16 bits to represent an entry number, + * make sure that we are within the desired range */ + /* Reserve the highest entry number as a dummy */ + GPR_ASSERT(entries < INVALID_ENTRY_INDEX); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); + /* Clear out all entries */ + memset(stack->entries, 0, entries * sizeof(stack->entries[0])); + memset(&stack->head, 0, sizeof(stack->head)); + + GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); + + /* Point the head at reserved dummy entry */ + stack->head.contents.index = INVALID_ENTRY_INDEX; +/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ +#ifdef GPR_ARCH_64 + stack->head.contents.pad = 0; +#endif + stack->head.contents.aba_ctr = 0; + return stack; +} + +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { + gpr_free_aligned(stack->entries); + gpr_free(stack); +} + +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { + lockfree_node head; + lockfree_node newhead; + lockfree_node curent; + lockfree_node newent; + + /* First fill in the entry's index and aba ctr for new head */ + newhead.contents.index = (uint16_t)entry; +#ifdef GPR_ARCH_64 + /* Fill in the pad to avoid confusing memcheck tools */ + newhead.contents.pad = 0; +#endif + + /* Also post-increment the aba_ctr */ + curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newhead.contents.aba_ctr = ++curent.contents.aba_ctr; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); + + do { + /* Atomically get the existing head value for use */ + head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); + /* Point to it */ + newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newent.contents.index = head.contents.index; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); + } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); + /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; +} + +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { + lockfree_node head; + lockfree_node newhead; + + do { + head.atm = gpr_atm_acq_load(&(stack->head.atm)); + if (head.contents.index == INVALID_ENTRY_INDEX) { + return -1; + } + newhead.atm = + gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); + + } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); + + return head.contents.index; +} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h new file mode 100644 index 0000000000..6324211b72 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.h @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H +#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H + +#include <stddef.h> + +typedef struct gpr_stack_lockfree gpr_stack_lockfree; + +/* This stack must specify the maximum number of entries to track. + The current implementation only allows up to 65534 entries */ +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries); +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack); + +/* Pass in a valid entry number for the next stack entry */ +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); + +/* Returns -1 on empty or the actual entry number */ +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack); + +#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..0cd436883a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +160,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +205,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 37d4f038b7..1fd65928f9 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes, GPR_ASSERT(*unprotected_bytes_size <= INT_MAX); read_from_ssl = SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size); - if (read_from_ssl == 0) { - gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly."); - return TSI_INTERNAL_ERROR; - } - if (read_from_ssl < 0) { + if (read_from_ssl <= 0) { read_from_ssl = SSL_get_error(ssl, read_from_ssl); switch (read_from_ssl) { - case SSL_ERROR_WANT_READ: - /* We need more data to finish the frame. */ + case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */ + case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */ *unprotected_bytes_size = 0; return TSI_OK; case SSL_ERROR_WANT_WRITE: diff --git a/src/cpp/README.md b/src/cpp/README.md index e9ef489a7c..77fbee1149 100644 --- a/src/cpp/README.md +++ b/src/cpp/README.md @@ -47,7 +47,7 @@ below. # Build from Source ```sh - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git submodule update --init $ make diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..038eb32e04 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -76,8 +76,9 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -109,10 +110,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return Call(c_call, this, cq); + return internal::Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -131,7 +133,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { } namespace { -class TagSaver final : public CompletionQueueTag { +class TagSaver final : public internal::CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..e65cb9903f 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,9 +27,11 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( + GenericClientAsyncReaderWriter::internal::Create( channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + internal::RpcMethod(method.c_str(), + internal::RpcMethod::BIDI_STREAMING), + context, tag)); } } // namespace grpc diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index f34b0f3d58..000a03277b 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag); + auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 815b607032..fc7feb79fe 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -36,11 +36,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - MethodHandler* handler = - new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( + internal::MethodHandler* handler = + new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, + ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, - handler); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 09d5cebe98..99d6680c50 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - RpcServiceMethod* method_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 04abb6fd3e..3bff9999b9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -86,7 +86,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> +typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -104,12 +105,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -117,15 +118,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -202,14 +203,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -219,15 +220,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -300,14 +301,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -344,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<RpcServiceMethod> unknown_method_; - std::unique_ptr<RpcServiceMethod> health_check_; + std::unique_ptr<internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -421,13 +423,13 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { grpc_server* Server::c_server() { return server_; } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -448,7 +450,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -588,7 +590,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -599,8 +602,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -622,7 +625,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -637,7 +641,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -651,7 +656,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -693,7 +698,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 4913682f1d..2e55ffbac4 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public CallOpSetInterface { +class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -146,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) { call->PerformOps(completion_op_); } -CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast<CompletionQueueTag*>(completion_op_); +internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast<internal::CompletionQueueTag*>(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, diff --git a/src/php/README.md b/src/php/README.md index 90c8cb386a..11f99e134c 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -100,7 +100,7 @@ the `composer` and `protoc` binaries. You can find out how to get these Clone this repository ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc ``` Build and install the gRPC C core library @@ -129,7 +129,7 @@ $ sudo make install You will need the source code to run tests ```sh -$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc +$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $ cd grpc $ git pull --recurse-submodules && git submodule update --init --recursive ``` diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index 28a2714568..f047243f82 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -46,7 +46,7 @@ package named :code:`python-dev`). :: $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT $ cd $REPO_ROOT $ git submodule update --init diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index a8c69720fd..5950bfa0e6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -518,7 +518,6 @@ cdef extern from "grpc/compression.h": ctypedef struct grpc_compression_options: uint32_t enabled_algorithms_bitset - grpc_compression_algorithm default_compression_algorithm int grpc_compression_algorithm_parse( grpc_slice value, grpc_compression_algorithm *algorithm) nogil diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5819a624f7..ea5bdbae58 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -39,6 +39,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', + 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', 'src/core/lib/support/string_util_windows.c', diff --git a/test/core/support/BUILD b/test/core/support/BUILD index 298eebd9b8..37870d922d 100644 --- a/test/core/support/BUILD +++ b/test/core/support/BUILD @@ -127,6 +127,16 @@ grpc_cc_test( ) grpc_cc_test( + name = "stack_lockfree_test", + srcs = ["stack_lockfree_test.c"], + language = "C", + deps = [ + "//:gpr", + "//test/core/util:gpr_test_util", + ], +) + +grpc_cc_test( name = "string_test", srcs = ["string_test.c"], language = "C", diff --git a/test/core/support/stack_lockfree_test.c b/test/core/support/stack_lockfree_test.c new file mode 100644 index 0000000000..4b1f60ce01 --- /dev/null +++ b/test/core/support/stack_lockfree_test.c @@ -0,0 +1,140 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include "test/core/util/test_config.h" + +/* max stack size supported */ +#define MAX_STACK_SIZE 65534 + +#define MAX_THREADS 32 + +static void test_serial_sized(size_t size) { + gpr_stack_lockfree *stack = gpr_stack_lockfree_create(size); + size_t i; + size_t j; + + /* First try popping empty */ + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + + /* Now add one item and check it */ + gpr_stack_lockfree_push(stack, 3); + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == 3); + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + + /* Now add repeatedly more items and check them */ + for (i = 1; i < size; i *= 2) { + for (j = 0; j <= i; j++) { + GPR_ASSERT(gpr_stack_lockfree_push(stack, (int)j) == (j == 0)); + } + for (j = 0; j <= i; j++) { + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == (int)(i - j)); + } + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + } + + gpr_stack_lockfree_destroy(stack); +} + +static void test_serial() { + size_t i; + for (i = 128; i < MAX_STACK_SIZE; i *= 2) { + test_serial_sized(i); + } + test_serial_sized(MAX_STACK_SIZE); +} + +struct test_arg { + gpr_stack_lockfree *stack; + int stack_size; + int nthreads; + int rank; + int sum; +}; + +static void test_mt_body(void *v) { + struct test_arg *arg = (struct test_arg *)v; + int lo, hi; + int i; + int res; + lo = arg->rank * arg->stack_size / arg->nthreads; + hi = (arg->rank + 1) * arg->stack_size / arg->nthreads; + for (i = lo; i < hi; i++) { + gpr_stack_lockfree_push(arg->stack, i); + if ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) { + arg->sum += res; + } + } + while ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) { + arg->sum += res; + } +} + +static void test_mt_sized(size_t size, int nth) { + gpr_stack_lockfree *stack; + struct test_arg args[MAX_THREADS]; + gpr_thd_id thds[MAX_THREADS]; + int sum; + int i; + gpr_thd_options options = gpr_thd_options_default(); + + stack = gpr_stack_lockfree_create(size); + for (i = 0; i < nth; i++) { + args[i].stack = stack; + args[i].stack_size = (int)size; + args[i].nthreads = nth; + args[i].rank = i; + args[i].sum = 0; + } + gpr_thd_options_set_joinable(&options); + for (i = 0; i < nth; i++) { + GPR_ASSERT(gpr_thd_new(&thds[i], test_mt_body, &args[i], &options)); + } + sum = 0; + for (i = 0; i < nth; i++) { + gpr_thd_join(thds[i]); + sum = sum + args[i].sum; + } + GPR_ASSERT((unsigned)sum == ((unsigned)size * (size - 1)) / 2); + gpr_stack_lockfree_destroy(stack); +} + +static void test_mt() { + size_t size; + int nth; + for (nth = 1; nth < MAX_THREADS; nth++) { + for (size = 128; size < MAX_STACK_SIZE; size *= 2) { + test_mt_sized(size, nth); + } + test_mt_sized(MAX_STACK_SIZE, nth); + } +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + test_serial(); + test_mt(); + return 0; +} diff --git a/test/cpp/codegen/codegen_test_full.cc b/test/cpp/codegen/codegen_test_full.cc index 2eacc99d82..98792bde04 100644 --- a/test/cpp/codegen/codegen_test_full.cc +++ b/test/cpp/codegen/codegen_test_full.cc @@ -27,8 +27,8 @@ class CodegenTestFull : public ::testing::Test {}; TEST_F(CodegenTestFull, Init) { grpc::CompletionQueue cq; - void* tag; - bool ok; + void* tag = nullptr; + bool ok = false; cq.AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); ASSERT_FALSE(ok); } diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index b43c27f3f7..f8c768831e 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -39,7 +39,6 @@ namespace grpc { class CompletionQueue; class Channel; -class RpcService; class ServerCompletionQueue; class ServerContext; } // namespace grpc @@ -137,10 +136,10 @@ class ServiceA final { ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; - const ::grpc::RpcMethod rpcmethod_MethodA1_; - const ::grpc::RpcMethod rpcmethod_MethodA2_; - const ::grpc::RpcMethod rpcmethod_MethodA3_; - const ::grpc::RpcMethod rpcmethod_MethodA4_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA2_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA3_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA4_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -320,7 +319,7 @@ class ServiceA final { public: WithStreamedUnaryMethod_MethodA1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -341,7 +340,7 @@ class ServiceA final { public: WithSplitStreamingMethod_MethodA3() { ::grpc::Service::MarkMethodStreamed(2, - new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -387,7 +386,7 @@ class ServiceB final { private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodB1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -444,7 +443,7 @@ class ServiceB final { public: WithStreamedUnaryMethod_MethodB1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 18308a2e16..6bb6ad8018 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,7 +69,7 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} -class DummyTag final : public CompletionQueueTag { +class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) override { return true; } }; diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 07e41817b3..9954e2c0bf 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -112,8 +112,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { } private: - static Status ClientPull(ServerContext* context, - ReaderInterface<SimpleRequest>* stream, + template <class R> + static Status ClientPull(ServerContext* context, R* stream, SimpleResponse* response) { SimpleRequest request; while (stream->Read(&request)) { @@ -126,8 +126,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { } return Status::OK; } - static Status ServerPush(ServerContext* context, - WriterInterface<SimpleResponse>* stream, + template <class W> + static Status ServerPush(ServerContext* context, W* stream, const SimpleResponse& response, std::function<bool()> done) { while ((done == nullptr) || !done()) { diff --git a/tools/distrib/python/grpcio_tools/README.rst b/tools/distrib/python/grpcio_tools/README.rst index 55521d17bb..fb44cfaf80 100644 --- a/tools/distrib/python/grpcio_tools/README.rst +++ b/tools/distrib/python/grpcio_tools/README.rst @@ -53,7 +53,7 @@ GCC-like stuff, but you may end up having a bad time. :: $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice - $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT + $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT $ cd $REPO_ROOT $ git submodule update --init diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 63067b3081..766c20f59b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1304,6 +1304,8 @@ src/core/lib/support/mpscq.h \ src/core/lib/support/murmur_hash.c \ src/core/lib/support/murmur_hash.h \ src/core/lib/support/spinlock.h \ +src/core/lib/support/stack_lockfree.c \ +src/core/lib/support/stack_lockfree.h \ src/core/lib/support/string.c \ src/core/lib/support/string.h \ src/core/lib/support/string_posix.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 65a99bf7d6..477a258daa 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -835,6 +835,21 @@ "headers": [], "is_filegroup": false, "language": "c", + "name": "gpr_stack_lockfree_test", + "src": [ + "test/core/support/stack_lockfree_test.c" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", + "gpr_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", "name": "gpr_string_test", "src": [ "test/core/support/string_test.c" @@ -7450,6 +7465,7 @@ "src/core/lib/support/mpscq.h", "src/core/lib/support/murmur_hash.h", "src/core/lib/support/spinlock.h", + "src/core/lib/support/stack_lockfree.h", "src/core/lib/support/string.h", "src/core/lib/support/string_windows.h", "src/core/lib/support/thd_internal.h", @@ -7522,6 +7538,8 @@ "src/core/lib/support/murmur_hash.c", "src/core/lib/support/murmur_hash.h", "src/core/lib/support/spinlock.h", + "src/core/lib/support/stack_lockfree.c", + "src/core/lib/support/stack_lockfree.h", "src/core/lib/support/string.c", "src/core/lib/support/string.h", "src/core/lib/support/string_posix.c", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index c3ce58dc76..ffd65bc5f1 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -955,6 +955,28 @@ "posix", "windows" ], + "cpu_cost": 7, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "gpr_stack_lockfree_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln index 3c6e8d8f34..b7696a9965 100644 --- a/vsprojects/buildtests_c.sln +++ b/vsprojects/buildtests_c.sln @@ -484,6 +484,15 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_spinlock_test", "vcxpro {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_stack_lockfree_test", "vcxproj\test\gpr_stack_lockfree_test\gpr_stack_lockfree_test.vcxproj", "{AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}" + ProjectSection(myProperties) = preProject + lib = "False" + EndProjectSection + ProjectSection(ProjectDependencies) = postProject + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + EndProjectSection +EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "gpr_string_test", "vcxproj\test\gpr_string_test\gpr_string_test.vcxproj", "{B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}" ProjectSection(myProperties) = preProject lib = "False" @@ -2488,6 +2497,22 @@ Global {D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|Win32.Build.0 = Release|Win32 {D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|x64.ActiveCfg = Release|x64 {D8EDE51A-CBB2-0362-D59B-09AA92A94F45}.Release-DLL|x64.Build.0 = Release|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|Win32.ActiveCfg = Debug|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|x64.ActiveCfg = Debug|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|Win32.ActiveCfg = Release|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|x64.ActiveCfg = Release|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|Win32.Build.0 = Debug|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug|x64.Build.0 = Debug|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|Win32.Build.0 = Release|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release|x64.Build.0 = Release|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|Win32.ActiveCfg = Debug|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|Win32.Build.0 = Debug|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|x64.ActiveCfg = Debug|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Debug-DLL|x64.Build.0 = Debug|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|Win32.ActiveCfg = Release|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|Win32.Build.0 = Release|Win32 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|x64.ActiveCfg = Release|x64 + {AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}.Release-DLL|x64.Build.0 = Release|x64 {B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Debug|Win32.ActiveCfg = Debug|Win32 {B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Debug|x64.ActiveCfg = Debug|x64 {B453457D-8FBC-9C9F-A55E-C06FCE13B1F2}.Release|Win32.ActiveCfg = Release|Win32 diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj index 3f0dedd675..7fb81a7fbc 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj @@ -198,6 +198,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\mpscq.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\spinlock.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string_windows.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\thd_internal.h" /> @@ -253,6 +254,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string_posix.c"> diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters index f8cccb5c08..27d9d2f38f 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters @@ -73,6 +73,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\murmur_hash.c"> <Filter>src\core\lib\support</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c"> + <Filter>src\core\lib\support</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c"> <Filter>src\core\lib\support</Filter> </ClCompile> @@ -287,6 +290,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\spinlock.h"> <Filter>src\core\lib\support</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.h"> + <Filter>src\core\lib\support</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\support\string.h"> <Filter>src\core\lib\support</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj new file mode 100644 index 0000000000..218cff8ba9 --- /dev/null +++ b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj @@ -0,0 +1,193 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" /> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Debug|x64"> + <Configuration>Debug</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|x64"> + <Configuration>Release</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{AD06B5CD-8D5C-A365-C46B-3CF32237A4F7}</ProjectGuid> + <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected> + <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration"> + <PlatformToolset>v100</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration"> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration"> + <PlatformToolset>v120</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration"> + <PlatformToolset>v140</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <WholeProgramOptimization>true</WholeProgramOptimization> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="$(SolutionDir)\..\vsprojects\global.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)'=='Debug'"> + <TargetName>gpr_stack_lockfree_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'"> + <TargetName>gpr_stack_lockfree_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\core\support\stack_lockfree_test.c"> + </ClCompile> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj"> + <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj"> + <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + </ImportGroup> + <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> + <PropertyGroup> + <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> + </PropertyGroup> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" /> + </Target> +</Project> + diff --git a/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters new file mode 100644 index 0000000000..b222ab4128 --- /dev/null +++ b/vsprojects/vcxproj/test/gpr_stack_lockfree_test/gpr_stack_lockfree_test.vcxproj.filters @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\core\support\stack_lockfree_test.c"> + <Filter>test\core\support</Filter> + </ClCompile> + </ItemGroup> + + <ItemGroup> + <Filter Include="test"> + <UniqueIdentifier>{de41d2bf-c9ce-7f55-6da3-8d3798fd8fe2}</UniqueIdentifier> + </Filter> + <Filter Include="test\core"> + <UniqueIdentifier>{4867ad9b-2b88-de6a-a1df-7a733d389df9}</UniqueIdentifier> + </Filter> + <Filter Include="test\core\support"> + <UniqueIdentifier>{fca98aa0-f0c0-9254-ab22-a2792b4b94f0}</UniqueIdentifier> + </Filter> + </ItemGroup> +</Project> + |