aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++/server.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++/server.h')
-rw-r--r--include/grpc++/server.h95
1 files changed, 54 insertions, 41 deletions
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 6876961e21..fba9952e6e 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -34,8 +34,10 @@
#ifndef GRPCXX_SERVER_H
#define GRPCXX_SERVER_H
+#include <condition_variable>
#include <list>
#include <memory>
+#include <mutex>
#include <vector>
#include <grpc++/completion_queue.h>
@@ -43,7 +45,6 @@
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/server_interface.h>
#include <grpc++/impl/rpc_service_method.h>
-#include <grpc++/impl/sync.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
@@ -64,7 +65,7 @@ class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
-class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
+class Server final : public ServerInterface, private GrpcLibraryCodegen {
public:
~Server();
@@ -72,7 +73,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
///
/// \warning The server must be either shutting down or some other thread must
/// call \a Shutdown for this function to ever return.
- void Wait() GRPC_OVERRIDE;
+ void Wait() override;
/// Global Callbacks
///
@@ -96,9 +97,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
// Returns a \em raw pointer to the underlying grpc_server instance.
grpc_server* c_server();
- // Returns a \em raw pointer to the underlying CompletionQueue.
- CompletionQueue* completion_queue();
-
private:
friend class AsyncGenericService;
friend class ServerBuilder;
@@ -108,27 +106,49 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
class AsyncRequest;
class ShutdownRequest;
+ /// SyncRequestThreadManager is an implementation of ThreadManager. This class
+ /// is responsible for polling for incoming RPCs and calling the RPC handlers.
+ /// This is only used in case of a Sync server (i.e a server exposing a sync
+ /// interface)
+ class SyncRequestThreadManager;
+
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
- /// \param thread_pool The threadpool instance to use for call processing.
- /// \param thread_pool_owned Does the server own the \a thread_pool instance?
/// \param max_message_size Maximum message length that the channel can
/// receive.
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_message_size, ChannelArguments* args);
+ ///
+ /// \param args The channel args
+ ///
+ /// \param sync_server_cqs The completion queues to use if the server is a
+ /// synchronous server (or a hybrid server). The server polls for new RPCs on
+ /// these queues
+ ///
+ /// \param min_pollers The minimum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param max_pollers The maximum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
+ /// server completion queues passed via sync_server_cqs param.
+ Server(int max_message_size, ChannelArguments* args,
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
- bool RegisterService(const grpc::string* host,
- Service* service) GRPC_OVERRIDE;
+ bool RegisterService(const grpc::string* host, Service* service) override;
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
- void RegisterAsyncGenericService(AsyncGenericService* service) GRPC_OVERRIDE;
+ void RegisterAsyncGenericService(AsyncGenericService* service) override;
/// Tries to bind \a server to the given \a addr.
///
@@ -142,7 +162,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
///
/// \warning It's an error to call this method on an already started server.
int AddListeningPort(const grpc::string& addr,
- ServerCredentials* creds) GRPC_OVERRIDE;
+ ServerCredentials* creds) override;
/// Start the server.
///
@@ -152,54 +172,47 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
- bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE;
-
- /// Process one or more incoming calls.
- void RunRpc() GRPC_OVERRIDE;
-
- /// Schedule \a RunRpc to run in the threadpool.
- void ScheduleCallback() GRPC_OVERRIDE;
+ bool Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
- void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
+ void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;
- void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE;
+ void ShutdownInternal(gpr_timespec deadline) override;
- int max_message_size() const GRPC_OVERRIDE { return max_message_size_; };
+ int max_receive_message_size() const override {
+ return max_receive_message_size_;
+ };
- grpc_server* server() GRPC_OVERRIDE { return server_; };
+ grpc_server* server() override { return server_; };
ServerInitializer* initializer();
- const int max_message_size_;
+ const int max_receive_message_size_;
+
+ /// The following completion queues are ONLY used in case of Sync API i.e if
+ /// the server has any services with sync methods. The server uses these
+ /// completion queues to poll for new RPCs
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs_;
- // Completion queue.
- CompletionQueue cq_;
+ /// List of ThreadManager instances (one for each cq in the sync_server_cqs)
+ std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
// Sever status
- grpc::mutex mu_;
+ std::mutex mu_;
bool started_;
bool shutdown_;
- bool shutdown_notified_;
- // The number of threads which are running callbacks.
- int num_running_cb_;
- grpc::condition_variable callback_cv_;
+ bool shutdown_notified_; // Was notify called on the shutdown_cv_
- grpc::condition_variable shutdown_cv_;
+ std::condition_variable shutdown_cv_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
- std::list<SyncRequest>* sync_methods_;
std::vector<grpc::string> services_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
- // Pointer to the c grpc server.
+ // Pointer to the wrapped grpc_server.
grpc_server* server_;
- ThreadPoolInterface* thread_pool_;
- // Whether the thread pool is created and owned by the server.
- bool thread_pool_owned_;
-
std::unique_ptr<ServerInitializer> server_initializer_;
};