diff options
author | David Garcia Quintas <dgq@google.com> | 2016-01-19 18:14:29 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-01-19 18:44:47 -0800 |
commit | a701ed77592c18560326131cb37fcd5c29de83fe (patch) | |
tree | 2dcdad6844ca2bce92633d44ce03f68520e39e2b /include | |
parent | 4a0f8ea5355166706cabdd1ca984ae10341ab64b (diff) | |
parent | 1f4e72c19c184eb9395d27e1c560c9c0cb1bfd4f (diff) |
Merge branch 'proto_interfaces' into yang-g-sync_async_mix
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/channel.h | 72 | ||||
-rw-r--r-- | include/grpc++/client_context.h | 3 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 3 | ||||
-rw-r--r-- | include/grpc++/generic/async_generic_service.h | 1 | ||||
-rw-r--r-- | include/grpc++/generic/generic_stub.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/channel_interface.h | 122 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_interface.h | 253 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_method.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 9 | ||||
-rw-r--r-- | include/grpc++/server.h | 166 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 2 | ||||
-rw-r--r-- | include/grpc++/support/async_stream.h | 8 | ||||
-rw-r--r-- | include/grpc++/support/async_unary_call.h | 4 | ||||
-rw-r--r-- | include/grpc++/support/sync_stream.h | 7 |
15 files changed, 430 insertions, 231 deletions
diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index 541be1345f..f4035eee63 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -38,35 +38,16 @@ #include <grpc/grpc.h> #include <grpc++/impl/call.h> +#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/support/config.h> struct grpc_channel; namespace grpc { -class CallOpSetInterface; -class ChannelArguments; -class CompletionQueue; -class ChannelCredentials; -class SecureChannelCredentials; - -template <class R> -class ClientReader; -template <class W> -class ClientWriter; -template <class W, class R> -class ClientReaderWriter; -template <class R> -class ClientAsyncReader; -template <class W> -class ClientAsyncWriter; -template <class W, class R> -class ClientAsyncReaderWriter; -template <class R> -class ClientAsyncResponseReader; - /// Channels represent a connection to an endpoint. Created by \a CreateChannel. -class Channel GRPC_FINAL : public GrpcLibrary, +class Channel GRPC_FINAL : public ChannelInterface, + public GrpcLibrary, public CallHook, public std::enable_shared_from_this<Channel> { public: @@ -74,61 +55,28 @@ class Channel GRPC_FINAL : public GrpcLibrary, /// Get the current channel state. If the channel is in IDLE and /// \a try_to_connect is set to true, try to connect. - grpc_connectivity_state GetState(bool try_to_connect); - - /// Return the \a tag on \a cq when the channel state is changed or \a - /// deadline expires. \a GetState needs to called to get the current state. - template <typename T> - void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline, - CompletionQueue* cq, void* tag) { - TimePoint<T> deadline_tp(deadline); - NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag); - } - - /// Blocking wait for channel state change or \a deadline expiration. - /// \a GetState needs to called to get the current state. - template <typename T> - bool WaitForStateChange(grpc_connectivity_state last_observed, T deadline) { - TimePoint<T> deadline_tp(deadline); - return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time()); - } + grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE; private: - template <class R> - friend class ::grpc::ClientReader; - template <class W> - friend class ::grpc::ClientWriter; - template <class W, class R> - friend class ::grpc::ClientReaderWriter; - template <class R> - friend class ::grpc::ClientAsyncReader; - template <class W> - friend class ::grpc::ClientAsyncWriter; - template <class W, class R> - friend class ::grpc::ClientAsyncReaderWriter; - template <class R> - friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, + friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); - friend class ::grpc::RpcMethod; friend std::shared_ptr<Channel> CreateChannelInternal( const grpc::string& host, grpc_channel* c_channel); - Channel(const grpc::string& host, grpc_channel* c_channel); Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq); - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call); - void* RegisterMethod(const char* method); + CompletionQueue* cq) GRPC_OVERRIDE; + void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; + void* RegisterMethod(const char* method) GRPC_OVERRIDE; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, - void* tag); + void* tag) GRPC_OVERRIDE; bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline); + gpr_timespec deadline) GRPC_OVERRIDE; const grpc::string host_; grpc_channel* const c_channel_; // owned diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 25eeb3876f..6ce1c85539 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -69,6 +69,7 @@ struct census_context; namespace grpc { class Channel; +class ChannelInterface; class CompletionQueue; class CallCredentials; class RpcMethod; @@ -315,7 +316,7 @@ class ClientContext { template <class R> friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, + friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 5c2bc202c3..adae9265a6 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -68,6 +68,7 @@ class BidiStreamingHandler; class UnknownMethodHandler; class Channel; +class ChannelInterface; class ClientContext; class CompletionQueueTag; class CompletionQueue; @@ -171,7 +172,7 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::Server; friend class ::grpc::ServerContext; template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, + friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result); diff --git a/include/grpc++/generic/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index 748311dbe7..9ae8391dc4 100644 --- a/include/grpc++/generic/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -51,6 +51,7 @@ class GenericServerContext GRPC_FINAL : public ServerContext { private: friend class Server; + friend class ServerInterface; grpc::string method_; grpc::string host_; diff --git a/include/grpc++/generic/generic_stub.h b/include/grpc++/generic/generic_stub.h index 1bb7900b06..cd8fb54aed 100644 --- a/include/grpc++/generic/generic_stub.h +++ b/include/grpc++/generic/generic_stub.h @@ -47,7 +47,7 @@ typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer> // by name. class GenericStub GRPC_FINAL { public: - explicit GenericStub(std::shared_ptr<Channel> channel) : channel_(channel) {} + explicit GenericStub(std::shared_ptr<ChannelInterface> channel) : channel_(channel) {} // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> Call( @@ -55,7 +55,7 @@ class GenericStub GRPC_FINAL { void* tag); private: - std::shared_ptr<Channel> channel_; + std::shared_ptr<ChannelInterface> channel_; }; } // namespace grpc diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 4cdc800267..554683aa66 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CLIENT_UNARY_CALL_H #include <grpc++/impl/call.h> +#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/support/config.h> #include <grpc++/support/status.h> @@ -47,7 +48,7 @@ class RpcMethod; // Wrapper that performs a blocking unary call template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(Channel* channel, const RpcMethod& method, +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { CompletionQueue cq; diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h new file mode 100644 index 0000000000..353f52cef9 --- /dev/null +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -0,0 +1,122 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_CHANNEL_INTERFACE_H +#define GRPCXX_CHANNEL_INTERFACE_H + +#include <grpc++/impl/grpc_library.h> +#include <grpc++/support/status.h> +#include <grpc++/support/time.h> + +namespace grpc { +class Call; +class ClientContext; +class RpcMethod; +class CallOpSetInterface; +class CompletionQueue; + +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class W, class R> +class ClientReaderWriter; +template <class R> +class ClientAsyncReader; +template <class W> +class ClientAsyncWriter; +template <class W, class R> +class ClientAsyncReaderWriter; +template <class R> +class ClientAsyncResponseReader; + +/// Codegen interface for \a grpc::Channel. +class ChannelInterface { + public: + virtual ~ChannelInterface() {} + /// Get the current channel state. If the channel is in IDLE and + /// \a try_to_connect is set to true, try to connect. + virtual grpc_connectivity_state GetState(bool try_to_connect) = 0; + + /// Return the \a tag on \a cq when the channel state is changed or \a + /// deadline expires. \a GetState needs to called to get the current state. + template <typename T> + void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline, + CompletionQueue* cq, void* tag) { + TimePoint<T> deadline_tp(deadline); + NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag); + } + + /// Blocking wait for channel state change or \a deadline expiration. + /// \a GetState needs to called to get the current state. + template <typename T> + bool WaitForStateChange(grpc_connectivity_state last_observed, T deadline) { + TimePoint<T> deadline_tp(deadline); + return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time()); + } + + private: + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class W, class R> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncReader; + template <class W> + friend class ::grpc::ClientAsyncWriter; + template <class W, class R> + friend class ::grpc::ClientAsyncReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncResponseReader; + template <class InputMessage, class OutputMessage> + friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); + friend class ::grpc::RpcMethod; + virtual Call CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + virtual void* RegisterMethod(const char* method) = 0; + virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline, CompletionQueue* cq, + void* tag) = 0; + virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline) = 0; +}; + +} // namespace grpc + +#endif // GRPCXX_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h new file mode 100644 index 0000000000..fc2abb1157 --- /dev/null +++ b/include/grpc++/impl/codegen/server_interface.h @@ -0,0 +1,253 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_SERVER_INTERFACE_H +#define GRPCXX_SERVER_INTERFACE_H + +#include <grpc++/completion_queue.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/rpc_service_method.h> + +namespace grpc { + +class AsyncGenericService; +class AsynchronousService; +class GenericServerContext; +class RpcService; +class RpcServiceMethod; +class ServerAsyncStreamingInterface; +class ServerContext; +class ServerCredentials; +class Service; +class ThreadPoolInterface; + +/// Models a gRPC server. +/// +/// Servers are configured and started via \a grpc::ServerBuilder. +class ServerInterface : public CallHook { + public: + virtual ~ServerInterface() {} + + /// Shutdown the server, blocking until all rpc processing finishes. + /// Forcefully terminate pending calls after \a deadline expires. + /// + /// \param deadline How long to wait until pending rpcs are forcefully + /// terminated. + template <class T> + void Shutdown(const T& deadline) { + ShutdownInternal(TimePoint<T>(deadline).raw_time()); + } + + /// Shutdown the server, waiting for all rpc processing to finish. + void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } + + /// Block waiting for all work to complete. + /// + /// \warning The server must be either shutting down or some other thread must + /// call \a Shutdown for this function to ever return. + virtual void Wait() = 0; + + protected: + friend class AsynchronousService; + friend class Service; + + /// Register a service. This call does not take ownership of the service. + /// The service must exist for the lifetime of the Server instance. + virtual bool RegisterService(const grpc::string* host, Service* service) = 0; + + /// Register a generic service. This call does not take ownership of the + /// service. The service must exist for the lifetime of the Server instance. + virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; + + /// Tries to bind \a server to the given \a addr. + /// + /// It can be invoked multiple times. + /// + /// \param addr The address to try to bind to the server (eg, localhost:1234, + /// 192.168.1.1:31416, [::1]:27182, etc.). + /// \params creds The credentials associated with the server. + /// + /// \return bound port number on sucess, 0 on failure. + /// + /// \warning It's an error to call this method on an already started server. + virtual int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) = 0; + + /// Start the server. + /// + /// \param cqs Completion queues for handling asynchronous services. The + /// caller is required to keep all completion queues live until the server is + /// destroyed. + /// \param num_cqs How many completion queues does \a cqs hold. + /// + /// \return true on a successful shutdown. + virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; + + /// Process one or more incoming calls. + virtual void RunRpc() = 0; + + /// Schedule \a RunRpc to run in the threadpool. + virtual void ScheduleCallback() = 0; + + virtual void ShutdownInternal(gpr_timespec deadline) = 0; + + virtual int max_message_size() const = 0; + + virtual grpc_server* server() = 0; + + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + + class BaseAsyncRequest : public CompletionQueueTag { + public: + BaseAsyncRequest(ServerInterface* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, void* tag, + bool delete_on_finalize); + virtual ~BaseAsyncRequest() {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + protected: + ServerInterface* const server_; + ServerContext* const context_; + ServerAsyncStreamingInterface* const stream_; + CompletionQueue* const call_cq_; + void* const tag_; + const bool delete_on_finalize_; + grpc_call* call_; + grpc_metadata_array initial_metadata_array_; + }; + + class RegisteredAsyncRequest : public BaseAsyncRequest { + public: + RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, void* tag); + + // uses BaseAsyncRequest::FinalizeResult + + protected: + void IssueRequest(void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq); + }; + + class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { + public: + NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, + ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { + IssueRequest(registered_method, nullptr, notification_cq); + } + + // uses RegisteredAsyncRequest::FinalizeResult + }; + + template <class Message> + class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { + public: + PayloadAsyncRequest(void* registered_method, ServerInterface* server, + ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* request) + : RegisteredAsyncRequest(server, context, stream, call_cq, tag), + request_(request) { + IssueRequest(registered_method, &payload_, notification_cq); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool serialization_status = + *status && payload_ && + SerializationTraits<Message>::Deserialize( + payload_, request_, server_->max_message_size()).ok(); + bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status); + *status = serialization_status&&* status; + return ret; + } + + private: + grpc_byte_buffer* payload_; + Message* const request_; + }; + + class GenericAsyncRequest : public BaseAsyncRequest { + public: + GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + private: + grpc_call_details call_details_; + }; + + template <class Message> + void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* message) { + GPR_ASSERT(method); + new PayloadAsyncRequest<Message>(method->server_tag(), this, context, + stream, call_cq, notification_cq, tag, + message); + } + + void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { + GPR_ASSERT(method); + new NoPayloadAsyncRequest(method->server_tag(), this, context, stream, + call_cq, notification_cq, tag); + } + + void RequestAsyncGenericCall(GenericServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, + tag, true); + } +}; + +} // namespace grpc + +#endif // GRPCXX_SERVER_INTERFACE_H diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 9800268062..cc29f6db49 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -36,7 +36,7 @@ #include <memory> -#include <grpc++/channel.h> +#include <grpc++/impl/codegen/channel_interface.h> namespace grpc { @@ -53,7 +53,7 @@ class RpcMethod { : name_(name), method_type_(type), channel_tag_(NULL) {} RpcMethod(const char* name, RpcType type, - const std::shared_ptr<Channel>& channel) + const std::shared_ptr<ChannelInterface>& channel) : name_(name), method_type_(type), channel_tag_(channel->RegisterMethod(name)) {} diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 483ecb32b0..d1a2542858 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -36,7 +36,7 @@ #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/serialization_traits.h> -#include <grpc++/server.h> +#include <grpc++/impl/codegen/server_interface.h> #include <grpc++/support/config.h> #include <grpc++/support/status.h> @@ -45,6 +45,7 @@ namespace grpc { class Call; class CompletionQueue; class Server; +class ServerInterface; class ServerCompletionQueue; class ServerContext; @@ -55,7 +56,7 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class Server; + friend class ServerInterface; virtual void BindCall(Call* call) = 0; }; @@ -146,8 +147,8 @@ class Service { private: friend class Server; - - Server* server_; + friend class ServerInterface; + ServerInterface* server_; std::vector<std::unique_ptr<RpcServiceMethod>> methods_; }; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 18eb4b91b0..7ae94eba07 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -42,6 +42,7 @@ #include <grpc++/impl/grpc_library.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/sync.h> +#include <grpc++/impl/codegen/server_interface.h> #include <grpc++/security/server_credentials.h> #include <grpc++/support/channel_arguments.h> #include <grpc++/support/config.h> @@ -56,34 +57,21 @@ class GenericServerContext; class AsyncGenericService; class ServerAsyncStreamingInterface; class ServerContext; -class Service; class ThreadPoolInterface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class Server GRPC_FINAL : public GrpcLibrary, private CallHook { +class Server GRPC_FINAL : public ServerInterface, + public GrpcLibrary { public: ~Server(); - /// Shutdown the server, blocking until all rpc processing finishes. - /// Forcefully terminate pending calls after \a deadline expires. - /// - /// \param deadline How long to wait until pending rpcs are forcefully - /// terminated. - template <class T> - void Shutdown(const T& deadline) { - ShutdownInternal(TimePoint<T>(deadline).raw_time()); - } - - /// Shutdown the server, waiting for all rpc processing to finish. - void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - /// Block waiting for all work to complete. /// /// \warning The server must be either shutting down or some other thread must /// call \a Shutdown for this function to ever return. - void Wait(); + void Wait() GRPC_OVERRIDE; /// Global Callbacks /// @@ -104,13 +92,16 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { private: friend class AsyncGenericService; - friend class Service; friend class ServerBuilder; class SyncRequest; class AsyncRequest; class ShutdownRequest; + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + /// Server constructors. To be used by \a ServerBuilder only. /// /// \param thread_pool The threadpool instance to use for call processing. @@ -122,11 +113,11 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string* host, Service* service); + bool RegisterService(const grpc::string* host, Service* service) GRPC_OVERRIDE; /// Register a generic service. This call does not take ownership of the /// service. The service must exist for the lifetime of the Server instance. - void RegisterAsyncGenericService(AsyncGenericService* service); + void RegisterAsyncGenericService(AsyncGenericService* service) GRPC_OVERRIDE; /// Tries to bind \a server to the given \a addr. /// @@ -139,7 +130,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// \return bound port number on sucess, 0 on failure. /// /// \warning It's an error to call this method on an already started server. - int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); + int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) GRPC_OVERRIDE; /// Start the server. /// @@ -149,144 +140,21 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// \param num_cqs How many completion queues does \a cqs hold. /// /// \return true on a successful shutdown. - bool Start(ServerCompletionQueue** cqs, size_t num_cqs); - - void HandleQueueClosed(); + bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE; /// Process one or more incoming calls. - void RunRpc(); + void RunRpc() GRPC_OVERRIDE; /// Schedule \a RunRpc to run in the threadpool. - void ScheduleCallback(); + void ScheduleCallback() GRPC_OVERRIDE; void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; - void ShutdownInternal(gpr_timespec deadline); - - class BaseAsyncRequest : public CompletionQueueTag { - public: - BaseAsyncRequest(Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag, - bool delete_on_finalize); - virtual ~BaseAsyncRequest(); - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - - protected: - Server* const server_; - ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const call_cq_; - void* const tag_; - const bool delete_on_finalize_; - grpc_call* call_; - grpc_metadata_array initial_metadata_array_; - }; - - class RegisteredAsyncRequest : public BaseAsyncRequest { - public: - RegisteredAsyncRequest(Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag); - - // uses BaseAsyncRequest::FinalizeResult - - protected: - void IssueRequest(void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq); - }; - - class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { - public: - NoPayloadAsyncRequest(void* registered_method, Server* server, - ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { - IssueRequest(registered_method, nullptr, notification_cq); - } - - // uses RegisteredAsyncRequest::FinalizeResult - }; - - template <class Message> - class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { - public: - PayloadAsyncRequest(void* registered_method, Server* server, - ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - Message* request) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag), - request_(request) { - IssueRequest(registered_method, &payload_, notification_cq); - } - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - bool serialization_status = - *status && payload_ && - SerializationTraits<Message>::Deserialize( - payload_, request_, server_->max_message_size_).ok(); - bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status&&* status; - return ret; - } - - private: - grpc_byte_buffer* payload_; - Message* const request_; - }; - - class GenericAsyncRequest : public BaseAsyncRequest { - public: - GenericAsyncRequest(Server* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - bool delete_on_finalize); - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - - private: - grpc_call_details call_details_; - }; - - class UnimplementedAsyncRequestContext; - class UnimplementedAsyncRequest; - class UnimplementedAsyncResponse; - - template <class Message> - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - Message* message) { - GPR_ASSERT(method); - new PayloadAsyncRequest<Message>(method->server_tag(), this, context, - stream, call_cq, notification_cq, tag, - message); - } + void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE; - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { - GPR_ASSERT(method); - new NoPayloadAsyncRequest(method->server_tag(), this, context, stream, - call_cq, notification_cq, tag); - } + int max_message_size() const GRPC_OVERRIDE { return max_message_size_; }; - void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, - tag, true); - } + grpc_server* server() GRPC_OVERRIDE { return server_; }; const int max_message_size_; diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 8ba73486dc..1c3f39e238 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -80,6 +80,7 @@ class Call; class CallOpBuffer; class CompletionQueue; class Server; +class ServerInterface; namespace testing { class InteropServerContextInspector; @@ -138,6 +139,7 @@ class ServerContext { private: friend class ::grpc::testing::InteropServerContextInspector; + friend class ::grpc::ServerInterface; friend class ::grpc::Server; template <class W, class R> friend class ::grpc::ServerAsyncReader; diff --git a/include/grpc++/support/async_stream.h b/include/grpc++/support/async_stream.h index 0c96352ccd..87c38a7ec8 100644 --- a/include/grpc++/support/async_stream.h +++ b/include/grpc++/support/async_stream.h @@ -35,7 +35,7 @@ #define GRPCXX_SUPPORT_ASYNC_STREAM_H #include <grpc/support/log.h> -#include <grpc++/channel.h> +#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/call.h> @@ -103,7 +103,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { public: /// Create a stream and write the first request out. template <class W> - ClientAsyncReader(Channel* channel, CompletionQueue* cq, + ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -166,7 +166,7 @@ template <class W> class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { public: template <class R> - ClientAsyncWriter(Channel* channel, CompletionQueue* cq, + ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { @@ -234,7 +234,7 @@ template <class W, class R> class ClientAsyncReaderWriter GRPC_FINAL : public ClientAsyncReaderWriterInterface<W, R> { public: - ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq, + ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { diff --git a/include/grpc++/support/async_unary_call.h b/include/grpc++/support/async_unary_call.h index 0f4ad2656f..396bd15b93 100644 --- a/include/grpc++/support/async_unary_call.h +++ b/include/grpc++/support/async_unary_call.h @@ -35,7 +35,7 @@ #define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H #include <grpc/support/log.h> -#include <grpc++/channel.h> +#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/server_context.h> @@ -58,7 +58,7 @@ class ClientAsyncResponseReader GRPC_FINAL : public ClientAsyncResponseReaderInterface<R> { public: template <class W> - ClientAsyncResponseReader(Channel* channel, CompletionQueue* cq, + ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h index daf4e367ae..aaf1a5b7cc 100644 --- a/include/grpc++/support/sync_stream.h +++ b/include/grpc++/support/sync_stream.h @@ -36,6 +36,7 @@ #include <grpc/support/log.h> #include <grpc++/channel.h> +#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/call.h> @@ -118,7 +119,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { public: /// Blocking create a stream and write the first request out. template <class W> - ClientReader(Channel* channel, const RpcMethod& method, + ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, @@ -182,7 +183,7 @@ class ClientWriter : public ClientWriterInterface<W> { public: /// Blocking create a stream. template <class R> - ClientWriter(Channel* channel, const RpcMethod& method, + ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); @@ -248,7 +249,7 @@ template <class W, class R> class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { public: /// Blocking create a stream. - ClientReaderWriter(Channel* channel, const RpcMethod& method, + ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata> ops; |