diff options
Diffstat (limited to 'include/grpc++')
42 files changed, 816 insertions, 834 deletions
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel.h index 4176cded7b..a8af74175b 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel.h @@ -31,36 +31,49 @@ * */ -#ifndef GRPCXX_CHANNEL_INTERFACE_H -#define GRPCXX_CHANNEL_INTERFACE_H +#ifndef GRPCXX_CHANNEL_H +#define GRPCXX_CHANNEL_H #include <memory> #include <grpc/grpc.h> -#include <grpc++/status.h> #include <grpc++/impl/call.h> +#include <grpc++/impl/grpc_library.h> +#include <grpc++/support/config.h> -struct grpc_call; +struct grpc_channel; namespace grpc { -class Call; -class CallOpBuffer; -class ClientContext; +class CallOpSetInterface; +class ChannelArguments; class CompletionQueue; -class RpcMethod; +class Credentials; +class SecureCredentials; -class ChannelInterface : public CallHook, - public std::enable_shared_from_this<ChannelInterface> { - public: - virtual ~ChannelInterface() {} +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ClientAsyncReader; +template <class W> +class ClientAsyncWriter; +template <class R, class W> +class ClientAsyncReaderWriter; +template <class R> +class ClientAsyncResponseReader; - virtual void* RegisterMethod(const char* method_name) = 0; - virtual Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) = 0; +class Channel GRPC_FINAL : public GrpcLibrary, + public CallHook, + public std::enable_shared_from_this<Channel> { + public: + ~Channel(); // Get the current channel state. If the channel is in IDLE and try_to_connect // is set to true, try to connect. - virtual grpc_connectivity_state GetState(bool try_to_connect) = 0; + grpc_connectivity_state GetState(bool try_to_connect); // Return the tag on cq when the channel state is changed or deadline expires. // GetState needs to called to get the current state. @@ -80,13 +93,46 @@ class ChannelInterface : public CallHook, } private: - virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline, - CompletionQueue* cq, void* tag) = 0; - virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline) = 0; + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class R, class W> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncReader; + template <class W> + friend class ::grpc::ClientAsyncWriter; + template <class R, class W> + friend class ::grpc::ClientAsyncReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncResponseReader; + template <class InputMessage, class OutputMessage> + friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); + friend class ::grpc::RpcMethod; + friend std::shared_ptr<Channel> CreateChannelInternal( + const grpc::string& host, grpc_channel* c_channel); + + Channel(const grpc::string& host, grpc_channel* c_channel); + + Call CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq); + void PerformOpsOnCall(CallOpSetInterface* ops, Call* call); + void* RegisterMethod(const char* method); + + void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline, CompletionQueue* cq, + void* tag); + bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline); + + const grpc::string host_; + grpc_channel* const c_channel_; // owned }; } // namespace grpc -#endif // GRPCXX_CHANNEL_INTERFACE_H +#endif // GRPCXX_CHANNEL_H diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index d7fafac9b3..62e5260a18 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -42,16 +42,17 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include <grpc++/auth_context.h> -#include <grpc++/config.h> -#include <grpc++/status.h> -#include <grpc++/time.h> +#include <grpc++/support/auth_context.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> +#include <grpc++/support/string_ref.h> +#include <grpc++/support/time.h> struct census_context; namespace grpc { -class ChannelInterface; +class Channel; class CompletionQueue; class Credentials; class RpcMethod; @@ -121,6 +122,10 @@ class PropagationOptions { gpr_uint32 propagate_; }; +namespace testing { +class InteropClientContextInspector; +} // namespace testing + class ClientContext { public: ClientContext(); @@ -134,12 +139,14 @@ class ClientContext { void AddMetadata(const grpc::string& meta_key, const grpc::string& meta_value); - const std::multimap<grpc::string, grpc::string>& GetServerInitialMetadata() { + const std::multimap<grpc::string_ref, grpc::string_ref>& + GetServerInitialMetadata() { GPR_ASSERT(initial_metadata_received_); return recv_initial_metadata_; } - const std::multimap<grpc::string, grpc::string>& GetServerTrailingMetadata() { + const std::multimap<grpc::string_ref, grpc::string_ref>& + GetServerTrailingMetadata() { // TODO(yangg) check finished return trailing_metadata_; } @@ -181,7 +188,9 @@ class ClientContext { // Get and set census context void set_census_context(struct census_context* ccp) { census_context_ = ccp; } - struct census_context* census_context() const { return census_context_; } + struct census_context* census_context() const { + return census_context_; + } void TryCancel(); @@ -190,6 +199,7 @@ class ClientContext { ClientContext(const ClientContext&); ClientContext& operator=(const ClientContext&); + friend class ::grpc::testing::InteropClientContextInspector; friend class CallOpClientRecvStatus; friend class CallOpRecvInitialMetadata; friend class Channel; @@ -208,20 +218,18 @@ class ClientContext { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, + friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); grpc_call* call() { return call_; } - void set_call(grpc_call* call, - const std::shared_ptr<ChannelInterface>& channel); + void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); grpc::string authority() { return authority_; } bool initial_metadata_received_; - std::shared_ptr<ChannelInterface> channel_; + std::shared_ptr<Channel> channel_; grpc_call* call_; gpr_timespec deadline_; grpc::string authority_; @@ -229,8 +237,8 @@ class ClientContext { mutable std::shared_ptr<const AuthContext> auth_context_; struct census_context* census_context_; std::multimap<grpc::string, grpc::string> send_initial_metadata_; - std::multimap<grpc::string, grpc::string> recv_initial_metadata_; - std::multimap<grpc::string, grpc::string> trailing_metadata_; + std::multimap<grpc::string_ref, grpc::string_ref> recv_initial_metadata_; + std::multimap<grpc::string_ref, grpc::string_ref> trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 2f30211145..d81d2e735d 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -36,8 +36,8 @@ #include <grpc/support/time.h> #include <grpc++/impl/grpc_library.h> -#include <grpc++/status.h> -#include <grpc++/time.h> +#include <grpc++/support/status.h> +#include <grpc++/support/time.h> struct grpc_completion_queue; @@ -65,7 +65,7 @@ template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; class UnknownMethodHandler; -class ChannelInterface; +class Channel; class ClientContext; class CompletionQueue; class RpcMethod; @@ -143,8 +143,7 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::Server; friend class ::grpc::ServerContext; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, + friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); diff --git a/include/grpc++/create_channel.h b/include/grpc++/create_channel.h index 424a93a64c..0e559ac53e 100644 --- a/include/grpc++/create_channel.h +++ b/include/grpc++/create_channel.h @@ -36,15 +36,14 @@ #include <memory> -#include <grpc++/config.h> #include <grpc++/credentials.h> +#include <grpc++/support/channel_arguments.h> +#include <grpc++/support/config.h> namespace grpc { -class ChannelArguments; -class ChannelInterface; // If creds does not hold an object or is invalid, a lame channel is returned. -std::shared_ptr<ChannelInterface> CreateChannel( +std::shared_ptr<Channel> CreateChannel( const grpc::string& target, const std::shared_ptr<Credentials>& creds, const ChannelArguments& args); diff --git a/include/grpc++/credentials.h b/include/grpc++/credentials.h index a4f1e73118..71e1f00f15 100644 --- a/include/grpc++/credentials.h +++ b/include/grpc++/credentials.h @@ -36,12 +36,12 @@ #include <memory> -#include <grpc++/config.h> #include <grpc++/impl/grpc_library.h> +#include <grpc++/support/config.h> namespace grpc { class ChannelArguments; -class ChannelInterface; +class Channel; class SecureCredentials; class Credentials : public GrpcLibrary { @@ -57,11 +57,11 @@ class Credentials : public GrpcLibrary { virtual SecureCredentials* AsSecureCredentials() = 0; private: - friend std::shared_ptr<ChannelInterface> CreateChannel( + friend std::shared_ptr<Channel> CreateChannel( const grpc::string& target, const std::shared_ptr<Credentials>& creds, const ChannelArguments& args); - virtual std::shared_ptr<ChannelInterface> CreateChannel( + virtual std::shared_ptr<Channel> CreateChannel( const grpc::string& target, const ChannelArguments& args) = 0; }; diff --git a/include/grpc++/dynamic_thread_pool.h b/include/grpc++/dynamic_thread_pool.h deleted file mode 100644 index f0cd35940f..0000000000 --- a/include/grpc++/dynamic_thread_pool.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_DYNAMIC_THREAD_POOL_H -#define GRPCXX_DYNAMIC_THREAD_POOL_H - -#include <grpc++/config.h> - -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> -#include <grpc++/thread_pool_interface.h> - -#include <list> -#include <memory> -#include <queue> - -namespace grpc { - -class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { - public: - explicit DynamicThreadPool(int reserve_threads); - ~DynamicThreadPool(); - - void Add(const std::function<void()>& callback) GRPC_OVERRIDE; - - private: - class DynamicThread { - public: - DynamicThread(DynamicThreadPool *pool); - ~DynamicThread(); - private: - DynamicThreadPool *pool_; - std::unique_ptr<grpc::thread> thd_; - void ThreadFunc(); - }; - grpc::mutex mu_; - grpc::condition_variable cv_; - grpc::condition_variable shutdown_cv_; - bool shutdown_; - std::queue<std::function<void()>> callbacks_; - int reserve_threads_; - int nthreads_; - int threads_waiting_; - std::list<DynamicThread*> dead_threads_; - - void ThreadFunc(); - static void ReapThreads(std::list<DynamicThread*>* tlist); -}; - -} // namespace grpc - -#endif // GRPCXX_DYNAMIC_THREAD_POOL_H diff --git a/include/grpc++/fixed_size_thread_pool.h b/include/grpc++/fixed_size_thread_pool.h deleted file mode 100644 index 307e166142..0000000000 --- a/include/grpc++/fixed_size_thread_pool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_FIXED_SIZE_THREAD_POOL_H -#define GRPCXX_FIXED_SIZE_THREAD_POOL_H - -#include <grpc++/config.h> - -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> -#include <grpc++/thread_pool_interface.h> - -#include <queue> -#include <vector> - -namespace grpc { - -class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface { - public: - explicit FixedSizeThreadPool(int num_threads); - ~FixedSizeThreadPool(); - - void Add(const std::function<void()>& callback) GRPC_OVERRIDE; - - private: - grpc::mutex mu_; - grpc::condition_variable cv_; - bool shutdown_; - std::queue<std::function<void()>> callbacks_; - std::vector<grpc::thread*> threads_; - - void ThreadFunc(); -}; - -} // namespace grpc - -#endif // GRPCXX_FIXED_SIZE_THREAD_POOL_H diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index b435c6e73d..8578d850ff 100644 --- a/include/grpc++/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -31,11 +31,11 @@ * */ -#ifndef GRPCXX_ASYNC_GENERIC_SERVICE_H -#define GRPCXX_ASYNC_GENERIC_SERVICE_H +#ifndef GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H +#define GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H -#include <grpc++/byte_buffer.h> -#include <grpc++/stream.h> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/support/async_stream.h> struct grpc_server; @@ -75,4 +75,4 @@ class AsyncGenericService GRPC_FINAL { } // namespace grpc -#endif // GRPCXX_ASYNC_GENERIC_SERVICE_H +#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H diff --git a/include/grpc++/generic_stub.h b/include/grpc++/generic/generic_stub.h index c34e1fcf55..1bb7900b06 100644 --- a/include/grpc++/generic_stub.h +++ b/include/grpc++/generic/generic_stub.h @@ -31,11 +31,11 @@ * */ -#ifndef GRPCXX_GENERIC_STUB_H -#define GRPCXX_GENERIC_STUB_H +#ifndef GRPCXX_GENERIC_GENERIC_STUB_H +#define GRPCXX_GENERIC_GENERIC_STUB_H -#include <grpc++/byte_buffer.h> -#include <grpc++/stream.h> +#include <grpc++/support/async_stream.h> +#include <grpc++/support/byte_buffer.h> namespace grpc { @@ -47,18 +47,17 @@ typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer> // by name. class GenericStub GRPC_FINAL { public: - explicit GenericStub(std::shared_ptr<ChannelInterface> channel) - : channel_(channel) {} + explicit GenericStub(std::shared_ptr<Channel> channel) : channel_(channel) {} // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> Call( - ClientContext* context, const grpc::string& method, - CompletionQueue* cq, void* tag); + ClientContext* context, const grpc::string& method, CompletionQueue* cq, + void* tag); private: - std::shared_ptr<ChannelInterface> channel_; + std::shared_ptr<Channel> channel_; }; } // namespace grpc -#endif // GRPCXX_GENERIC_STUB_H +#endif // GRPCXX_GENERIC_GENERIC_STUB_H diff --git a/include/grpc++/impl/README.md b/include/grpc++/impl/README.md new file mode 100644 index 0000000000..612150caa0 --- /dev/null +++ b/include/grpc++/impl/README.md @@ -0,0 +1,4 @@ +**The APIs in this directory are not stable!** + +This directory contains header files that need to be installed but are not part +of the public API. Users should not use these headers directly. diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index d49102fa3e..fca5603047 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -34,18 +34,17 @@ #ifndef GRPCXX_IMPL_CALL_H #define GRPCXX_IMPL_CALL_H -#include <grpc/support/alloc.h> -#include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/config.h> -#include <grpc++/status.h> -#include <grpc++/impl/serialization_traits.h> - #include <functional> #include <memory> #include <map> +#include <cstring> -#include <string.h> +#include <grpc/support/alloc.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> struct grpc_call; struct grpc_op; @@ -55,8 +54,9 @@ namespace grpc { class ByteBuffer; class Call; -void FillMetadataMap(grpc_metadata_array* arr, - std::multimap<grpc::string, grpc::string>* metadata); +void FillMetadataMap( + grpc_metadata_array* arr, + std::multimap<grpc::string_ref, grpc::string_ref>* metadata); grpc_metadata* FillMetadataArray( const std::multimap<grpc::string, grpc::string>& metadata); @@ -67,14 +67,10 @@ class WriteOptions { WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} /// Clear all flags. - inline void Clear() { - flags_ = 0; - } + inline void Clear() { flags_ = 0; } /// Returns raw flags bitset. - inline gpr_uint32 flags() const { - return flags_; - } + inline gpr_uint32 flags() const { return flags_; } /// Sets flag for the disabling of compression for the next message write. /// @@ -122,9 +118,7 @@ class WriteOptions { /// not go out on the wire immediately. /// /// \sa GRPC_WRITE_BUFFER_HINT - inline bool get_buffer_hint() const { - return GetBit(GRPC_WRITE_BUFFER_HINT); - } + inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } WriteOptions& operator=(const WriteOptions& rhs) { flags_ = rhs.flags_; @@ -132,17 +126,11 @@ class WriteOptions { } private: - void SetBit(const gpr_int32 mask) { - flags_ |= mask; - } + void SetBit(const gpr_int32 mask) { flags_ |= mask; } - void ClearBit(const gpr_int32 mask) { - flags_ &= ~mask; - } + void ClearBit(const gpr_int32 mask) { flags_ &= ~mask; } - bool GetBit(const gpr_int32 mask) const { - return flags_ & mask; - } + bool GetBit(const gpr_int32 mask) const { return flags_ & mask; } gpr_uint32 flags_; }; @@ -431,7 +419,7 @@ class CallOpRecvInitialMetadata { } private: - std::multimap<grpc::string, grpc::string>* recv_initial_metadata_; + std::multimap<grpc::string_ref, grpc::string_ref>* recv_initial_metadata_; grpc_metadata_array recv_initial_metadata_arr_; }; @@ -474,7 +462,7 @@ class CallOpClientRecvStatus { } private: - std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_; + std::multimap<grpc::string_ref, grpc::string_ref>* recv_trailing_metadata_; Status* recv_status_; grpc_metadata_array recv_trailing_metadata_arr_; grpc_status_code status_code_; @@ -553,8 +541,7 @@ class CallOpSet : public CallOpSetInterface, template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> -class SneakyCallOpSet GRPC_FINAL - : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { +class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index b77ce7d02c..4cdc800267 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -34,21 +34,20 @@ #ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H #define GRPCXX_IMPL_CLIENT_UNARY_CALL_H -#include <grpc++/config.h> -#include <grpc++/status.h> - #include <grpc++/impl/call.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> namespace grpc { -class ChannelInterface; +class Channel; class ClientContext; class CompletionQueue; class RpcMethod; // Wrapper that performs a blocking unary call template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, +Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { CompletionQueue cq; diff --git a/include/grpc++/impl/grpc_library.h b/include/grpc++/impl/grpc_library.h index f9fa677901..ce4211418d 100644 --- a/include/grpc++/impl/grpc_library.h +++ b/include/grpc++/impl/grpc_library.h @@ -46,5 +46,4 @@ class GrpcLibrary { } // namespace grpc - #endif // GRPCXX_IMPL_GRPC_LIBRARY_H diff --git a/include/grpc++/impl/internal_stub.h b/include/grpc++/impl/internal_stub.h deleted file mode 100644 index 370a3b8ac5..0000000000 --- a/include/grpc++/impl/internal_stub.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_INTERNAL_STUB_H -#define GRPCXX_IMPL_INTERNAL_STUB_H - -#include <memory> - -#include <grpc++/channel_interface.h> - -namespace grpc { - -class InternalStub { - public: - InternalStub(const std::shared_ptr<ChannelInterface>& channel) - : channel_(channel) {} - virtual ~InternalStub() {} - - ChannelInterface* channel() { return channel_.get(); } - - private: - const std::shared_ptr<ChannelInterface> channel_; -}; - -} // namespace grpc - -#endif // GRPCXX_IMPL_INTERNAL_STUB_H diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h index ebefa3e1be..283e33486d 100644 --- a/include/grpc++/impl/proto_utils.h +++ b/include/grpc++/impl/proto_utils.h @@ -38,8 +38,8 @@ #include <grpc/grpc.h> #include <grpc++/impl/serialization_traits.h> -#include <grpc++/config_protobuf.h> -#include <grpc++/status.h> +#include <grpc++/support/config_protobuf.h> +#include <grpc++/support/status.h> namespace grpc { diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 50a160b08c..9800268062 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -34,6 +34,10 @@ #ifndef GRPCXX_IMPL_RPC_METHOD_H #define GRPCXX_IMPL_RPC_METHOD_H +#include <memory> + +#include <grpc++/channel.h> + namespace grpc { class RpcMethod { @@ -45,8 +49,14 @@ class RpcMethod { BIDI_STREAMING }; - RpcMethod(const char* name, RpcType type, void* channel_tag) - : name_(name), method_type_(type), channel_tag_(channel_tag) {} + RpcMethod(const char* name, RpcType type) + : name_(name), method_type_(type), channel_tag_(NULL) {} + + RpcMethod(const char* name, RpcType type, + const std::shared_ptr<Channel>& channel) + : name_(name), + method_type_(type), + channel_tag_(channel->RegisterMethod(name)) {} const char* name() const { return name_; } RpcType method_type() const { return method_type_; } diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 925801e1ce..fcb0b7ccce 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -39,10 +39,10 @@ #include <memory> #include <vector> -#include <grpc++/config.h> #include <grpc++/impl/rpc_method.h> -#include <grpc++/status.h> -#include <grpc++/stream.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> +#include <grpc++/support/sync_stream.h> namespace grpc { class ServerContext; @@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler { // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { public: - void RunHandler(const HandlerParameter& param) GRPC_FINAL { + template <class T> + static void FillOps(ServerContext* context, T* ops) { Status status(StatusCode::UNIMPLEMENTED, ""); - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(context->initial_metadata_); + context->sent_initial_metadata_ = true; } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops->ServerSendStatus(context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); } @@ -229,7 +235,7 @@ class RpcServiceMethod : public RpcMethod { // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, MethodHandler* handler) - : RpcMethod(name, type, nullptr), handler_(handler) {} + : RpcMethod(name, type), handler_(handler) {} MethodHandler* handler() { return handler_.get(); } diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h index 1f5c674e4c..3ea66a3405 100644 --- a/include/grpc++/impl/serialization_traits.h +++ b/include/grpc++/impl/serialization_traits.h @@ -37,12 +37,12 @@ namespace grpc { /// Defines how to serialize and deserialize some type. -/// +/// /// Used for hooking different message serialization API's into GRPC. /// Each SerializationTraits implementation must provide the following /// functions: /// static Status Serialize(const Message& msg, -/// grpc_byte_buffer** buffer, +/// grpc_byte_buffer** buffer, // bool* own_buffer); /// static Status Deserialize(grpc_byte_buffer* buffer, /// Message* msg, @@ -57,7 +57,7 @@ namespace grpc { /// msg. max_message_size is passed in as a bound on the maximum number of /// message bytes Deserialize should accept. /// -/// Both functions return a Status, allowing them to explain what went +/// Both functions return a Status, allowing them to explain what went /// wrong if required. template <class Message, class UnusedButHereForPartialTemplateSpecialization = void> diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index c33a278f5b..3b6ac1de77 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -34,10 +34,10 @@ #ifndef GRPCXX_IMPL_SERVICE_TYPE_H #define GRPCXX_IMPL_SERVICE_TYPE_H -#include <grpc++/config.h> #include <grpc++/impl/serialization_traits.h> #include <grpc++/server.h> -#include <grpc++/status.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> namespace grpc { diff --git a/include/grpc++/impl/sync.h b/include/grpc++/impl/sync.h index 2f41d2bdeb..999c4303cb 100644 --- a/include/grpc++/impl/sync.h +++ b/include/grpc++/impl/sync.h @@ -34,7 +34,7 @@ #ifndef GRPCXX_IMPL_SYNC_H #define GRPCXX_IMPL_SYNC_H -#include <grpc++/config.h> +#include <grpc++/support/config.h> #ifdef GRPC_CXX0X_NO_THREAD #include <grpc++/impl/sync_no_cxx11.h> diff --git a/include/grpc++/impl/sync_no_cxx11.h b/include/grpc++/impl/sync_no_cxx11.h index 5869b04c76..120a031045 100644 --- a/include/grpc++/impl/sync_no_cxx11.h +++ b/include/grpc++/impl/sync_no_cxx11.h @@ -38,7 +38,7 @@ namespace grpc { -template<class mutex> +template <class mutex> class lock_guard; class condition_variable; @@ -46,6 +46,7 @@ class mutex { public: mutex() { gpr_mu_init(&mu_); } ~mutex() { gpr_mu_destroy(&mu_); } + private: ::gpr_mu mu_; template <class mutex> @@ -58,6 +59,7 @@ class lock_guard { public: lock_guard(mutex &mu) : mu_(mu), locked(true) { gpr_mu_lock(&mu.mu_); } ~lock_guard() { unlock_internal(); } + protected: void lock_internal() { if (!locked) gpr_mu_lock(&mu_.mu_); @@ -67,6 +69,7 @@ class lock_guard { if (locked) gpr_mu_unlock(&mu_.mu_); locked = false; } + private: mutex &mu_; bool locked; @@ -76,7 +79,7 @@ class lock_guard { template <class mutex> class unique_lock : public lock_guard<mutex> { public: - unique_lock(mutex &mu) : lock_guard<mutex>(mu) { } + unique_lock(mutex &mu) : lock_guard<mutex>(mu) {} void lock() { this->lock_internal(); } void unlock() { this->unlock_internal(); } }; @@ -92,6 +95,7 @@ class condition_variable { } void notify_one() { gpr_cv_signal(&cv_); } void notify_all() { gpr_cv_broadcast(&cv_); } + private: gpr_cv cv_; }; diff --git a/include/grpc++/impl/thd.h b/include/grpc++/impl/thd.h index 4c4578a92d..f8d4258ac6 100644 --- a/include/grpc++/impl/thd.h +++ b/include/grpc++/impl/thd.h @@ -34,7 +34,7 @@ #ifndef GRPCXX_IMPL_THD_H #define GRPCXX_IMPL_THD_H -#include <grpc++/config.h> +#include <grpc++/support/config.h> #ifdef GRPC_CXX0X_NO_THREAD #include <grpc++/impl/thd_no_cxx11.h> diff --git a/include/grpc++/impl/thd_no_cxx11.h b/include/grpc++/impl/thd_no_cxx11.h index a6bdd7dfe9..84d03ce184 100644 --- a/include/grpc++/impl/thd_no_cxx11.h +++ b/include/grpc++/impl/thd_no_cxx11.h @@ -40,7 +40,8 @@ namespace grpc { class thread { public: - template<class T> thread(void (T::*fptr)(), T *obj) { + template <class T> + thread(void (T::*fptr)(), T *obj) { func_ = new thread_function<T>(fptr, obj); joined_ = false; start(); @@ -53,28 +54,28 @@ class thread { gpr_thd_join(thd_); joined_ = true; } + private: void start() { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_, thread_func, (void *) func_, &options); + gpr_thd_new(&thd_, thread_func, (void *)func_, &options); } static void thread_func(void *arg) { - thread_function_base *func = (thread_function_base *) arg; + thread_function_base *func = (thread_function_base *)arg; func->call(); } class thread_function_base { public: - virtual ~thread_function_base() { } + virtual ~thread_function_base() {} virtual void call() = 0; }; - template<class T> + template <class T> class thread_function : public thread_function_base { public: - thread_function(void (T::*fptr)(), T *obj) - : fptr_(fptr) - , obj_(obj) { } + thread_function(void (T::*fptr)(), T *obj) : fptr_(fptr), obj_(obj) {} virtual void call() { (obj_->*fptr_)(); } + private: void (T::*fptr_)(); T *obj_; @@ -84,8 +85,8 @@ class thread { bool joined_; // Disallow copy and assign. - thread(const thread&); - void operator=(const thread&); + thread(const thread &); + void operator=(const thread &); }; } // namespace grpc diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 8755b4b445..c8979e433c 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -38,11 +38,11 @@ #include <memory> #include <grpc++/completion_queue.h> -#include <grpc++/config.h> #include <grpc++/impl/call.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/impl/sync.h> -#include <grpc++/status.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> struct grpc_server; @@ -63,7 +63,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ~Server(); // Shutdown the server, block until all rpc processing finishes. - void Shutdown(); + // Forcefully terminate pending calls after deadline expires. + template <class T> + void Shutdown(const T& deadline) { + ShutdownInternal(TimePoint<T>(deadline).raw_time()); + } + + // Shutdown the server, waiting for all rpc processing to finish. + void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } // Block waiting for all work to complete (the server must either // be shutting down or some other thread must call Shutdown for this @@ -84,13 +91,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { int max_message_size); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string *host, RpcService* service); - bool RegisterAsyncService(const grpc::string *host, AsynchronousService* service); + bool RegisterService(const grpc::string* host, RpcService* service); + bool RegisterAsyncService(const grpc::string* host, + AsynchronousService* service); void RegisterAsyncGenericService(AsyncGenericService* service); // Add a listening port. Can be called multiple times. int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); // Start the server. - bool Start(); + bool Start(ServerCompletionQueue** cqs, size_t num_cqs); void HandleQueueClosed(); void RunRpc(); @@ -98,11 +106,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; + void ShutdownInternal(gpr_timespec deadline); + class BaseAsyncRequest : public CompletionQueueTag { public: BaseAsyncRequest(Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag); + CompletionQueue* call_cq, void* tag, + bool delete_on_finalize); virtual ~BaseAsyncRequest(); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -113,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; + const bool delete_on_finalize_; grpc_call* call_; grpc_metadata_array initial_metadata_array_; }; @@ -174,12 +186,13 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { Message* const request_; }; - class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest { + class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag); + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -187,6 +200,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { grpc_call_details call_details_; }; + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + template <class Message> void RequestAsyncCall(void* registered_method, ServerContext* context, ServerAsyncStreamingInterface* stream, @@ -211,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerCompletionQueue* notification_cq, void* tag) { new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, - tag); + tag, true); } const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 44ee00eec9..8cd2048592 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -37,7 +37,7 @@ #include <memory> #include <vector> -#include <grpc++/config.h> +#include <grpc++/support/config.h> namespace grpc { @@ -76,15 +76,14 @@ class ServerBuilder { // The service must exist for the lifetime of the Server instance returned by // BuildAndStart(). // Only matches requests with :authority \a host - void RegisterService(const grpc::string& host, - SynchronousService* service); + void RegisterService(const grpc::string& host, SynchronousService* service); // Register an asynchronous service. // This call does not take ownership of the service or completion queue. // The service and completion queuemust exist for the lifetime of the Server // instance returned by BuildAndStart(). // Only matches requests with :authority \a host - void RegisterAsyncService(const grpc::string& host, + void RegisterAsyncService(const grpc::string& host, AsynchronousService* service); // Set max message size in bytes. @@ -97,13 +96,9 @@ class ServerBuilder { std::shared_ptr<ServerCredentials> creds, int* selected_port = nullptr); - // Set the thread pool used for running appliation rpc handlers. - // Does not take ownership. - void SetThreadPool(ThreadPoolInterface* thread_pool); - // Add a completion queue for handling asynchronous services - // Caller is required to keep this completion queue live until calling - // BuildAndStart() + // Caller is required to keep this completion queue live until + // the server is destroyed. std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); // Return a running server which is ready for processing rpcs. @@ -117,9 +112,10 @@ class ServerBuilder { }; typedef std::unique_ptr<grpc::string> HostString; - template <class T> struct NamedService { + template <class T> + struct NamedService { explicit NamedService(T* s) : service(s) {} - NamedService(const grpc::string& h, T *s) + NamedService(const grpc::string& h, T* s) : host(new grpc::string(h)), service(s) {} HostString host; T* service; @@ -127,7 +123,8 @@ class ServerBuilder { int max_message_size_; std::vector<std::unique_ptr<NamedService<RpcService>>> services_; - std::vector<std::unique_ptr<NamedService<AsynchronousService>>> async_services_; + std::vector<std::unique_ptr<NamedService<AsynchronousService>>> + async_services_; std::vector<Port> ports_; std::vector<ServerCompletionQueue*> cqs_; std::shared_ptr<ServerCredentials> creds_; diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 8262dee654..4b17a28047 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -39,9 +39,10 @@ #include <grpc/compression.h> #include <grpc/support/time.h> -#include <grpc++/auth_context.h> -#include <grpc++/config.h> -#include <grpc++/time.h> +#include <grpc++/support/auth_context.h> +#include <grpc++/support/config.h> +#include <grpc++/support/string_ref.h> +#include <grpc++/support/time.h> struct gpr_timespec; struct grpc_metadata; @@ -81,7 +82,7 @@ class CompletionQueue; class Server; namespace testing { -class InteropContextInspector; +class InteropServerContextInspector; } // namespace testing // Interface of server side rpc context. @@ -103,7 +104,7 @@ class ServerContext { bool IsCancelled() const; - const std::multimap<grpc::string, grpc::string>& client_metadata() { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata() { return client_metadata_; } @@ -136,7 +137,7 @@ class ServerContext { } private: - friend class ::grpc::testing::InteropContextInspector; + friend class ::grpc::testing::InteropServerContextInspector; friend class ::grpc::Server; template <class W, class R> friend class ::grpc::ServerAsyncReader; @@ -185,7 +186,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr<const AuthContext> auth_context_; - std::multimap<grpc::string, grpc::string> client_metadata_; + std::multimap<grpc::string_ref, grpc::string_ref> client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; diff --git a/include/grpc++/server_credentials.h b/include/grpc++/server_credentials.h index 11acd67e8a..16b78c08af 100644 --- a/include/grpc++/server_credentials.h +++ b/include/grpc++/server_credentials.h @@ -37,7 +37,7 @@ #include <memory> #include <vector> -#include <grpc++/config.h> +#include <grpc++/support/config.h> struct grpc_server; diff --git a/include/grpc++/stream.h b/include/grpc++/support/async_stream.h index bc0c3c0f3b..4c12fda12f 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/support/async_stream.h @@ -31,364 +31,20 @@ * */ -#ifndef GRPCXX_STREAM_H -#define GRPCXX_STREAM_H +#ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H +#define GRPCXX_SUPPORT_ASYNC_STREAM_H -#include <grpc++/channel_interface.h> +#include <grpc/support/log.h> +#include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> -#include <grpc++/server_context.h> #include <grpc++/impl/call.h> #include <grpc++/impl/service_type.h> -#include <grpc++/status.h> -#include <grpc/support/log.h> +#include <grpc++/server_context.h> +#include <grpc++/support/status.h> namespace grpc { -// Common interface for all client side streaming. -class ClientStreamingInterface { - public: - virtual ~ClientStreamingInterface() {} - - // Wait until the stream finishes, and return the final status. When the - // client side declares it has no more message to send, either implicitly or - // by calling WritesDone, it needs to make sure there is no more message to - // be received from the server, either implicitly or by getting a false from - // a Read(). - // This function will return either: - // - when all incoming messages have been read and the server has returned - // status - // - OR when the server has returned a non-OK status - virtual Status Finish() = 0; -}; - -// An interface that yields a sequence of R messages. -template <class R> -class ReaderInterface { - public: - virtual ~ReaderInterface() {} - - // Blocking read a message and parse to msg. Returns true on success. - // The method returns false when there will be no more incoming messages, - // either because the other side has called WritesDone or the stream has - // failed (or been cancelled). - virtual bool Read(R* msg) = 0; -}; - -// An interface that can be fed a sequence of W messages. -template <class W> -class WriterInterface { - public: - virtual ~WriterInterface() {} - - // Blocking write msg to the stream. Returns true on success. - // Returns false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; - - inline bool Write(const W& msg) { - return Write(msg, WriteOptions()); - } -}; - -template <class R> -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface<R> { - public: - virtual void WaitForInitialMetadata() = 0; -}; - -template <class R> -class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { - public: - // Blocking create a stream and write the first request out. - template <class W> - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Should only be called before - // the first read. Calling this method is optional, and if it is not called - // the metadata will be available in ClientContext after the first read. - void WaitForInitialMetadata() { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet<CallOpRecvInitialMetadata> ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet<CallOpClientRecvStatus> ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -template <class W> -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface<W> { - public: - virtual bool WritesDone() = 0; -}; - -template <class W> -class ClientWriter : public ClientWriterInterface<W> { - public: - // Blocking create a stream. - template <class R> - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet<CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - // Read the final response and wait for the final status. - Status Finish() GRPC_OVERRIDE { - Status status; - finish_ops_.ClientRecvStatus(context_, &status); - call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_.Pluck(&finish_ops_)); - return status; - } - - private: - ClientContext* context_; - CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; - CompletionQueue cq_; - Call call_; -}; - -// Client-side interface for bi-directional streaming. -template <class W, class R> -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> { - public: - virtual void WaitForInitialMetadata() = 0; - virtual bool WritesDone() = 0; -}; - -template <class W, class R> -class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { - public: - // Blocking create a stream. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Should only be called before - // the first read. Calling this method is optional, and if it is not called - // the metadata will be available in ClientContext after the first read. - void WaitForInitialMetadata() { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet<CallOpRecvInitialMetadata> ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) return false; - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet<CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet<CallOpClientRecvStatus> ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -template <class R> -class ServerReader GRPC_FINAL : public ReaderInterface<R> { - public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -template <class W> -class ServerWriter GRPC_FINAL : public WriterInterface<W> { - public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -// Server-side interface for bi-directional streaming. -template <class W, class R> -class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, - public ReaderInterface<R> { - public: - ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - // Async interfaces // Common interface for all client side streaming. class ClientAsyncStreamingInterface { @@ -427,7 +83,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { public: // Create a stream and write the first request out. template <class W> - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, + ClientAsyncReader(Channel* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -486,7 +142,7 @@ template <class W> class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { public: template <class R> - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, + ClientAsyncWriter(Channel* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -551,7 +207,7 @@ template <class W, class R> class ClientAsyncReaderWriter GRPC_FINAL : public ClientAsyncReaderWriterInterface<W, R> { public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, + ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -640,9 +296,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, } // The response is dropped if the status is not OK. if (status.ok()) { - finish_ops_.ServerSendStatus( - ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); } else { finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); } @@ -764,6 +419,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } private: + friend class ::grpc::Server; + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } Call call_; @@ -776,4 +433,4 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } // namespace grpc -#endif // GRPCXX_STREAM_H +#endif // GRPCXX_SUPPORT_ASYNC_STREAM_H diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/support/async_unary_call.h index d631ccd134..0f4ad2656f 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/support/async_unary_call.h @@ -31,17 +31,17 @@ * */ -#ifndef GRPCXX_ASYNC_UNARY_CALL_H -#define GRPCXX_ASYNC_UNARY_CALL_H +#ifndef GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H +#define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H -#include <grpc++/channel_interface.h> +#include <grpc/support/log.h> +#include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/server_context.h> #include <grpc++/impl/call.h> #include <grpc++/impl/service_type.h> -#include <grpc++/status.h> -#include <grpc/support/log.h> +#include <grpc++/support/status.h> namespace grpc { @@ -58,7 +58,7 @@ class ClientAsyncResponseReader GRPC_FINAL : public ClientAsyncResponseReaderInterface<R> { public: template <class W> - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, + ClientAsyncResponseReader(Channel* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -121,8 +121,8 @@ class ServerAsyncResponseWriter GRPC_FINAL } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.ServerSendStatus( - ctx_->trailing_metadata_, finish_buf_.SendMessage(msg)); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + finish_buf_.SendMessage(msg)); } else { finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); } @@ -152,4 +152,4 @@ class ServerAsyncResponseWriter GRPC_FINAL } // namespace grpc -#endif // GRPCXX_ASYNC_UNARY_CALL_H +#endif // GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/auth_context.h b/include/grpc++/support/auth_context.h index f8ea8ad6f4..f4f2dcf5bb 100644 --- a/include/grpc++/auth_context.h +++ b/include/grpc++/support/auth_context.h @@ -31,13 +31,13 @@ * */ -#ifndef GRPCXX_AUTH_CONTEXT_H -#define GRPCXX_AUTH_CONTEXT_H +#ifndef GRPCXX_SUPPORT_AUTH_CONTEXT_H +#define GRPCXX_SUPPORT_AUTH_CONTEXT_H #include <iterator> #include <vector> -#include <grpc++/config.h> +#include <grpc++/support/config.h> struct grpc_auth_context; struct grpc_auth_property; @@ -62,6 +62,7 @@ class AuthPropertyIterator AuthPropertyIterator(); AuthPropertyIterator(const grpc_auth_property* property, const grpc_auth_property_iterator* iter); + private: friend class SecureAuthContext; const grpc_auth_property* property_; @@ -91,5 +92,4 @@ class AuthContext { } // namespace grpc -#endif // GRPCXX_AUTH_CONTEXT_H - +#endif // GRPCXX_SUPPORT_AUTH_CONTEXT_H diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/support/byte_buffer.h index cb3c6a1159..3f8cc25f47 100644 --- a/include/grpc++/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -31,16 +31,16 @@ * */ -#ifndef GRPCXX_BYTE_BUFFER_H -#define GRPCXX_BYTE_BUFFER_H +#ifndef GRPCXX_SUPPORT_BYTE_BUFFER_H +#define GRPCXX_SUPPORT_BYTE_BUFFER_H #include <grpc/grpc.h> #include <grpc/byte_buffer.h> #include <grpc/support/log.h> -#include <grpc++/config.h> -#include <grpc++/slice.h> -#include <grpc++/status.h> #include <grpc++/impl/serialization_traits.h> +#include <grpc++/support/config.h> +#include <grpc++/support/slice.h> +#include <grpc++/support/status.h> #include <vector> @@ -91,8 +91,8 @@ class SerializationTraits<ByteBuffer, void> { dest->set_buffer(byte_buffer); return Status::OK; } - static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer, - bool* own_buffer) { + static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer, + bool* own_buffer) { *buffer = source.buffer(); *own_buffer = false; return Status::OK; @@ -101,4 +101,4 @@ class SerializationTraits<ByteBuffer, void> { } // namespace grpc -#endif // GRPCXX_BYTE_BUFFER_H +#endif // GRPCXX_SUPPORT_BYTE_BUFFER_H diff --git a/include/grpc++/channel_arguments.h b/include/grpc++/support/channel_arguments.h index 4d926377ec..cee68467c7 100644 --- a/include/grpc++/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -31,15 +31,15 @@ * */ -#ifndef GRPCXX_CHANNEL_ARGUMENTS_H -#define GRPCXX_CHANNEL_ARGUMENTS_H +#ifndef GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H +#define GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H #include <vector> #include <list> -#include <grpc++/config.h> #include <grpc/compression.h> #include <grpc/grpc.h> +#include <grpc++/support/config.h> namespace grpc { namespace testing { @@ -90,4 +90,4 @@ class ChannelArguments { } // namespace grpc -#endif // GRPCXX_CHANNEL_ARGUMENTS_H +#endif // GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H diff --git a/include/grpc++/config.h b/include/grpc++/support/config.h index 889dc39eb7..836bd47283 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/support/config.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPCXX_CONFIG_H -#define GRPCXX_CONFIG_H +#ifndef GRPCXX_SUPPORT_CONFIG_H +#define GRPCXX_SUPPORT_CONFIG_H #if !defined(GRPC_NO_AUTODETECT_PLATFORM) @@ -113,4 +113,4 @@ typedef GRPC_CUSTOM_STRING string; } // namespace grpc -#endif // GRPCXX_CONFIG_H +#endif // GRPCXX_SUPPORT_CONFIG_H diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/support/config_protobuf.h index 3afc7a58e2..8235590d41 100644 --- a/include/grpc++/config_protobuf.h +++ b/include/grpc++/support/config_protobuf.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPCXX_CONFIG_PROTOBUF_H -#define GRPCXX_CONFIG_PROTOBUF_H +#ifndef GRPCXX_SUPPORT_CONFIG_PROTOBUF_H +#define GRPCXX_SUPPORT_CONFIG_PROTOBUF_H #ifndef GRPC_CUSTOM_PROTOBUF_INT64 #include <google/protobuf/stubs/common.h> @@ -69,4 +69,4 @@ typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; } // namespace protobuf } // namespace grpc -#endif // GRPCXX_CONFIG_PROTOBUF_H +#endif // GRPCXX_SUPPORT_CONFIG_PROTOBUF_H diff --git a/include/grpc++/slice.h b/include/grpc++/support/slice.h index 3e01bcf0ad..b2343a7f3d 100644 --- a/include/grpc++/slice.h +++ b/include/grpc++/support/slice.h @@ -31,11 +31,11 @@ * */ -#ifndef GRPCXX_SLICE_H -#define GRPCXX_SLICE_H +#ifndef GRPCXX_SUPPORT_SLICE_H +#define GRPCXX_SUPPORT_SLICE_H #include <grpc/support/slice.h> -#include <grpc++/config.h> +#include <grpc++/support/config.h> namespace grpc { @@ -71,4 +71,4 @@ class Slice GRPC_FINAL { } // namespace grpc -#endif // GRPCXX_SLICE_H +#endif // GRPCXX_SUPPORT_SLICE_H diff --git a/include/grpc++/status.h b/include/grpc++/support/status.h index fb8526ddce..05750ff600 100644 --- a/include/grpc++/status.h +++ b/include/grpc++/support/status.h @@ -31,11 +31,11 @@ * */ -#ifndef GRPCXX_STATUS_H -#define GRPCXX_STATUS_H +#ifndef GRPCXX_SUPPORT_STATUS_H +#define GRPCXX_SUPPORT_STATUS_H -#include <grpc++/status_code_enum.h> -#include <grpc++/config.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status_code_enum.h> namespace grpc { @@ -61,4 +61,4 @@ class Status { } // namespace grpc -#endif // GRPCXX_STATUS_H +#endif // GRPCXX_SUPPORT_STATUS_H diff --git a/include/grpc++/status_code_enum.h b/include/grpc++/support/status_code_enum.h index 2211c964cd..7cb40452c8 100644 --- a/include/grpc++/status_code_enum.h +++ b/include/grpc++/support/status_code_enum.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPCXX_STATUS_CODE_ENUM_H -#define GRPCXX_STATUS_CODE_ENUM_H +#ifndef GRPCXX_SUPPORT_STATUS_CODE_ENUM_H +#define GRPCXX_SUPPORT_STATUS_CODE_ENUM_H namespace grpc { @@ -156,4 +156,4 @@ enum StatusCode { } // namespace grpc -#endif // GRPCXX_STATUS_CODE_ENUM_H +#endif // GRPCXX_SUPPORT_STATUS_CODE_ENUM_H diff --git a/include/grpc++/support/string_ref.h b/include/grpc++/support/string_ref.h new file mode 100644 index 0000000000..348c42cbba --- /dev/null +++ b/include/grpc++/support/string_ref.h @@ -0,0 +1,117 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_STRING_REF_H +#define GRPCXX_STRING_REF_H + +#include <iterator> + +#include <grpc++/support/config.h> + +namespace grpc { + +// This class is a non owning reference to a string. +// It should be a strict subset of the upcoming std::string_ref. See: +// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3442.html +class string_ref { + public: + // types + typedef const char* const_iterator; + typedef std::reverse_iterator<const_iterator> const_reverse_iterator; + + // constants + const static size_t npos = size_t(-1); + + // construct/copy. + string_ref() : data_(nullptr), length_(0) {} + string_ref(const string_ref& other) + : data_(other.data_), length_(other.length_) {} + string_ref& operator=(const string_ref& rhs); + string_ref(const char* s); + string_ref(const char* s, size_t l) : data_(s), length_(l) {} + string_ref(const grpc::string& s) : data_(s.data()), length_(s.length()) {} + + // iterators + const_iterator begin() const { return data_; } + const_iterator end() const { return data_ + length_; } + const_iterator cbegin() const { return data_; } + const_iterator cend() const { return data_ + length_; } + const_reverse_iterator rbegin() const { + return const_reverse_iterator(end()); + } + const_reverse_iterator rend() const { + return const_reverse_iterator(begin()); + } + const_reverse_iterator crbegin() const { + return const_reverse_iterator(end()); + } + const_reverse_iterator crend() const { + return const_reverse_iterator(begin()); + } + + // capacity + size_t size() const { return length_; } + size_t length() const { return length_; } + size_t max_size() const { return length_; } + bool empty() const { return length_ == 0; } + + // element access + const char* data() const { return data_; } + + // string operations + int compare(string_ref x) const; + bool starts_with(string_ref x) const; + bool ends_with(string_ref x) const; + size_t find(string_ref s) const; + size_t find(char c) const; + + string_ref substr(size_t pos, size_t n = npos) const; + + private: + const char* data_; + size_t length_; +}; + +// Comparison operators +bool operator==(string_ref x, string_ref y); +bool operator!=(string_ref x, string_ref y); +bool operator<(string_ref x, string_ref y); +bool operator>(string_ref x, string_ref y); +bool operator<=(string_ref x, string_ref y); +bool operator>=(string_ref x, string_ref y); + +} // namespace grpc + +#endif // GRPCXX_STRING_REF_H + + diff --git a/include/grpc++/stub_options.h b/include/grpc++/support/stub_options.h index c7c16dcd55..973aa9bc83 100644 --- a/include/grpc++/stub_options.h +++ b/include/grpc++/support/stub_options.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPCXX_STUB_OPTIONS_H -#define GRPCXX_STUB_OPTIONS_H +#ifndef GRPCXX_SUPPORT_STUB_OPTIONS_H +#define GRPCXX_SUPPORT_STUB_OPTIONS_H namespace grpc { @@ -40,4 +40,4 @@ class StubOptions {}; } // namespace grpc -#endif // GRPCXX_STUB_OPTIONS_H +#endif // GRPCXX_SUPPORT_STUB_OPTIONS_H diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h new file mode 100644 index 0000000000..b4bb637ff2 --- /dev/null +++ b/include/grpc++/support/sync_stream.h @@ -0,0 +1,392 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_SUPPORT_SYNC_STREAM_H +#define GRPCXX_SUPPORT_SYNC_STREAM_H + +#include <grpc/support/log.h> +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> +#include <grpc++/server_context.h> +#include <grpc++/support/status.h> + +namespace grpc { + +// Common interface for all client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + // Wait until the stream finishes, and return the final status. When the + // client side declares it has no more message to send, either implicitly or + // by calling WritesDone, it needs to make sure there is no more message to + // be received from the server, either implicitly or by getting a false from + // a Read(). + // This function will return either: + // - when all incoming messages have been read and the server has returned + // status + // - OR when the server has returned a non-OK status + virtual Status Finish() = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + // Blocking read a message and parse to msg. Returns true on success. + // The method returns false when there will be no more incoming messages, + // either because the other side has called WritesDone or the stream has + // failed (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +// An interface that can be fed a sequence of W messages. +template <class W> +class WriterInterface { + public: + virtual ~WriterInterface() {} + + // Blocking write msg to the stream. Returns true on success. + // Returns false when the stream has been closed. + virtual bool Write(const W& msg, const WriteOptions& options) = 0; + + inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } +}; + +template <class R> +class ClientReaderInterface : public ClientStreamingInterface, + public ReaderInterface<R> { + public: + virtual void WaitForInitialMetadata() = 0; +}; + +template <class R> +class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { + public: + // Blocking create a stream and write the first request out. + template <class W> + ClientReader(Channel* channel, const RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet<CallOpClientRecvStatus> ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template <class W> +class ClientWriterInterface : public ClientStreamingInterface, + public WriterInterface<W> { + public: + virtual bool WritesDone() = 0; +}; + +template <class W> +class ClientWriter : public ClientWriterInterface<W> { + public: + // Blocking create a stream. + template <class R> + ClientWriter(Channel* channel, const RpcMethod& method, + ClientContext* context, R* response) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + // Read the final response and wait for the final status. + Status Finish() GRPC_OVERRIDE { + Status status; + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + ClientContext* context_; + CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; + CompletionQueue cq_; + Call call_; +}; + +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientReaderWriterInterface : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { + public: + virtual void WaitForInitialMetadata() = 0; + virtual bool WritesDone() = 0; +}; + +template <class W, class R> +class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { + public: + // Blocking create a stream. + ClientReaderWriter(Channel* channel, const RpcMethod& method, + ClientContext* context) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) return false; + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet<CallOpClientRecvStatus> ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template <class R> +class ServerReader GRPC_FINAL : public ReaderInterface<R> { + public: + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +template <class W> +class ServerWriter GRPC_FINAL : public WriterInterface<W> { + public: + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, + public ReaderInterface<R> { + public: + ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +} // namespace grpc + +#endif // GRPCXX_SUPPORT_SYNC_STREAM_H diff --git a/include/grpc++/time.h b/include/grpc++/support/time.h index 8fb2f8505c..2d4196b93b 100644 --- a/include/grpc++/time.h +++ b/include/grpc++/support/time.h @@ -31,10 +31,10 @@ * */ -#ifndef GRPCXX_TIME_H -#define GRPCXX_TIME_H +#ifndef GRPCXX_SUPPORT_TIME_H +#define GRPCXX_SUPPORT_TIME_H -#include <grpc++/config.h> +#include <grpc++/support/config.h> namespace grpc { @@ -107,4 +107,4 @@ class TimePoint<std::chrono::system_clock::time_point> { #endif // !GRPC_CXX0X_NO_CHRONO -#endif // GRPCXX_TIME_H +#endif // GRPCXX_SUPPORT_TIME_H diff --git a/include/grpc++/thread_pool_interface.h b/include/grpc++/thread_pool_interface.h deleted file mode 100644 index d080b31dcc..0000000000 --- a/include/grpc++/thread_pool_interface.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_THREAD_POOL_INTERFACE_H -#define GRPCXX_THREAD_POOL_INTERFACE_H - -#include <functional> - -namespace grpc { - -// A thread pool interface for running callbacks. -class ThreadPoolInterface { - public: - virtual ~ThreadPoolInterface() {} - - // Schedule the given callback for execution. - virtual void Add(const std::function<void()>& callback) = 0; -}; - -ThreadPoolInterface* CreateDefaultThreadPool(); - -} // namespace grpc - -#endif // GRPCXX_THREAD_POOL_INTERFACE_H |