diff options
Diffstat (limited to 'include/grpc++/server.h')
-rw-r--r-- | include/grpc++/server.h | 174 |
1 files changed, 20 insertions, 154 deletions
diff --git a/include/grpc++/server.h b/include/grpc++/server.h index f24ed333bb..2a71073a7e 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -39,7 +39,9 @@ #include <grpc++/completion_queue.h> #include <grpc++/impl/call.h> -#include <grpc++/impl/grpc_library.h> +#include <grpc++/impl/codegen/grpc_library.h> +#include <grpc++/impl/codegen/server_interface.h> +#include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/sync.h> #include <grpc++/security/server_credentials.h> #include <grpc++/support/channel_arguments.h> @@ -51,11 +53,8 @@ struct grpc_server; namespace grpc { -class AsynchronousService; class GenericServerContext; class AsyncGenericService; -class RpcService; -class RpcServiceMethod; class ServerAsyncStreamingInterface; class ServerContext; class ThreadPoolInterface; @@ -63,28 +62,15 @@ class ThreadPoolInterface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class Server GRPC_FINAL : public GrpcLibrary, private CallHook { +class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary { public: ~Server(); - /// Shutdown the server, blocking until all rpc processing finishes. - /// Forcefully terminate pending calls after \a deadline expires. - /// - /// \param deadline How long to wait until pending rpcs are forcefully - /// terminated. - template <class T> - void Shutdown(const T& deadline) { - ShutdownInternal(TimePoint<T>(deadline).raw_time()); - } - - /// Shutdown the server, waiting for all rpc processing to finish. - void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - /// Block waiting for all work to complete. /// /// \warning The server must be either shutting down or some other thread must /// call \a Shutdown for this function to ever return. - void Wait(); + void Wait() GRPC_OVERRIDE; /// Global Callbacks /// @@ -105,13 +91,16 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { private: friend class AsyncGenericService; - friend class AsynchronousService; friend class ServerBuilder; class SyncRequest; class AsyncRequest; class ShutdownRequest; + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + /// Server constructors. To be used by \a ServerBuilder only. /// /// \param thread_pool The threadpool instance to use for call processing. @@ -123,16 +112,12 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string* host, RpcService* service); - - /// Register an asynchronous service. This call does not take ownership of the - /// service. The service must exist for the lifetime of the Server instance. - bool RegisterAsyncService(const grpc::string* host, - AsynchronousService* service); + bool RegisterService(const grpc::string* host, + Service* service) GRPC_OVERRIDE; /// Register a generic service. This call does not take ownership of the /// service. The service must exist for the lifetime of the Server instance. - void RegisterAsyncGenericService(AsyncGenericService* service); + void RegisterAsyncGenericService(AsyncGenericService* service) GRPC_OVERRIDE; /// Tries to bind \a server to the given \a addr. /// @@ -145,7 +130,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// \return bound port number on sucess, 0 on failure. /// /// \warning It's an error to call this method on an already started server. - int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); + int AddListeningPort(const grpc::string& addr, + ServerCredentials* creds) GRPC_OVERRIDE; /// Start the server. /// @@ -155,141 +141,21 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// \param num_cqs How many completion queues does \a cqs hold. /// /// \return true on a successful shutdown. - bool Start(ServerCompletionQueue** cqs, size_t num_cqs); - - void HandleQueueClosed(); + bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE; /// Process one or more incoming calls. - void RunRpc(); + void RunRpc() GRPC_OVERRIDE; /// Schedule \a RunRpc to run in the threadpool. - void ScheduleCallback(); + void ScheduleCallback() GRPC_OVERRIDE; void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; - void ShutdownInternal(gpr_timespec deadline); - - class BaseAsyncRequest : public CompletionQueueTag { - public: - BaseAsyncRequest(Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag, - bool delete_on_finalize); - virtual ~BaseAsyncRequest(); - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - - protected: - Server* const server_; - ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const call_cq_; - void* const tag_; - const bool delete_on_finalize_; - grpc_call* call_; - grpc_metadata_array initial_metadata_array_; - }; - - class RegisteredAsyncRequest : public BaseAsyncRequest { - public: - RegisteredAsyncRequest(Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag); - - // uses BaseAsyncRequest::FinalizeResult + void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE; - protected: - void IssueRequest(void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq); - }; - - class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { - public: - NoPayloadAsyncRequest(void* registered_method, Server* server, - ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { - IssueRequest(registered_method, nullptr, notification_cq); - } - - // uses RegisteredAsyncRequest::FinalizeResult - }; - - template <class Message> - class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { - public: - PayloadAsyncRequest(void* registered_method, Server* server, - ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - Message* request) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag), - request_(request) { - IssueRequest(registered_method, &payload_, notification_cq); - } - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - bool serialization_status = - *status && payload_ && - SerializationTraits<Message>::Deserialize( - payload_, request_, server_->max_message_size_).ok(); - bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status&&* status; - return ret; - } - - private: - grpc_byte_buffer* payload_; - Message* const request_; - }; - - class GenericAsyncRequest : public BaseAsyncRequest { - public: - GenericAsyncRequest(Server* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - bool delete_on_finalize); - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - - private: - grpc_call_details call_details_; - }; - - class UnimplementedAsyncRequestContext; - class UnimplementedAsyncRequest; - class UnimplementedAsyncResponse; + int max_message_size() const GRPC_OVERRIDE { return max_message_size_; }; - template <class Message> - void RequestAsyncCall(void* registered_method, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, - Message* message) { - new PayloadAsyncRequest<Message>(registered_method, this, context, stream, - call_cq, notification_cq, tag, message); - } - - void RequestAsyncCall(void* registered_method, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) { - new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq, - notification_cq, tag); - } - - void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, - tag, true); - } + grpc_server* server() GRPC_OVERRIDE { return server_; }; const int max_message_size_; |