diff options
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/codegen/server_interface.h | 6 | ||||
-rw-r--r-- | include/grpc++/server.h | 64 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 35 |
3 files changed, 74 insertions, 31 deletions
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 4a00d7a3a1..5c41ca51b4 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -126,12 +126,6 @@ class ServerInterface : public CallHook { /// \return true on a successful shutdown. virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; - /// Process one or more incoming calls. - virtual void RunRpc() = 0; - - /// Schedule \a RunRpc to run in the threadpool. - virtual void ScheduleCallback() = 0; - virtual void ShutdownInternal(gpr_timespec deadline) = 0; virtual int max_receive_message_size() const = 0; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index f51a6c658f..a6d70c7577 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -105,18 +105,41 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { class AsyncRequest; class ShutdownRequest; + /// SyncRequestThreadManager is an implementation of ThreadManager. This class + /// is responsible for polling for incoming RPCs and calling the RPC handlers. + /// This is only used in case of a Sync server (i.e a server exposing a sync + /// interface) + class SyncRequestThreadManager; + class UnimplementedAsyncRequestContext; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; /// Server constructors. To be used by \a ServerBuilder only. /// - /// \param thread_pool The threadpool instance to use for call processing. - /// \param thread_pool_owned Does the server own the \a thread_pool instance? - /// \param max_receive_message_size Maximum message length that the channel - /// can receive. - Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args); + /// \param max_message_size Maximum message length that the channel can + /// receive. + /// + /// \param args The channel args + /// + /// \param sync_server_cqs The completion queues to use if the server is a + /// synchronous server (or a hybrid server). The server polls for new RPCs on + /// these queues + /// + /// \param min_pollers The minimum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param max_pollers The maximum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on + /// server completion queues passed via sync_server_cqs param. + Server(int max_message_size, ChannelArguments* args, + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int min_pollers, int max_pollers, int sync_cq_timeout_msec); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -151,12 +174,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { /// \return true on a successful shutdown. bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE; - /// Process one or more incoming calls. - void RunRpc() GRPC_OVERRIDE; - - /// Schedule \a RunRpc to run in the threadpool. - void ScheduleCallback() GRPC_OVERRIDE; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE; @@ -171,34 +188,31 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { const int max_receive_message_size_; - // Completion queue. - CompletionQueue cq_; + /// The following completion queues are ONLY used in case of Sync API i.e if + /// the server has any services with sync methods. The server uses these + /// completion queues to poll for new RPCs + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs_; + + /// List of ThreadManager instances (one for each cq in the sync_server_cqs) + std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; // Sever status grpc::mutex mu_; bool started_; bool shutdown_; - bool shutdown_notified_; - // The number of threads which are running callbacks. - int num_running_cb_; - grpc::condition_variable callback_cv_; + bool shutdown_notified_; // Was notify called on the shutdown_cv_ grpc::condition_variable shutdown_cv_; std::shared_ptr<GlobalCallbacks> global_callbacks_; - std::list<SyncRequest>* sync_methods_; std::vector<grpc::string> services_; - std::unique_ptr<RpcServiceMethod> unknown_method_; bool has_generic_service_; - // Pointer to the c grpc server. + // Pointer to the wrapped grpc_server. grpc_server* server_; - ThreadPoolInterface* thread_pool_; - // Whether the thread pool is created and owned by the server. - bool thread_pool_owned_; - std::unique_ptr<ServerInitializer> server_initializer_; }; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 37f1f8cb80..c6bcf8b90a 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -34,6 +34,7 @@ #ifndef GRPCXX_SERVER_BUILDER_H #define GRPCXX_SERVER_BUILDER_H +#include <climits> #include <map> #include <memory> #include <vector> @@ -42,6 +43,8 @@ #include <grpc++/impl/server_builder_plugin.h> #include <grpc++/support/config.h> #include <grpc/compression.h> +#include <grpc/support/cpu.h> +#include <grpc/support/useful.h> namespace grpc { @@ -62,6 +65,8 @@ class ServerBuilder { public: ServerBuilder(); + enum SyncServerOption { NUM_CQS, MIN_POLLERS, MAX_POLLERS, CQ_TIMEOUT_MSEC }; + /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned /// by \a BuildAndStart(). @@ -115,6 +120,9 @@ class ServerBuilder { ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option); + /// Only useful if this is a Synchronous server. + ServerBuilder& SetSyncServerOption(SyncServerOption option, int value); + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. @@ -170,6 +178,28 @@ class ServerBuilder { int* selected_port; }; + struct SyncServerSettings { + SyncServerSettings() + : num_cqs(GPR_MAX(gpr_cpu_num_cores(), 4)), + min_pollers(1), + max_pollers(INT_MAX), + cq_timeout_msec(1000) {} + + // Number of server completion queues to create to listen to incoming RPCs. + int num_cqs; + + // Minimum number of threads per completion queue that should be listening + // to incoming RPCs. + int min_pollers; + + // Maximum number of threads per completion queue that can be listening to + // incoming RPCs. + int max_pollers; + + // The timeout for server completion queue's AsyncNext call. + int cq_timeout_msec; + }; + typedef std::unique_ptr<grpc::string> HostString; struct NamedService { explicit NamedService(Service* s) : service(s) {} @@ -184,7 +214,12 @@ class ServerBuilder { std::vector<std::unique_ptr<ServerBuilderOption>> options_; std::vector<std::unique_ptr<NamedService>> services_; std::vector<Port> ports_; + + SyncServerSettings sync_server_settings_; + + // List of completion queues added via AddCompletionQueue() method std::vector<ServerCompletionQueue*> cqs_; + std::shared_ptr<ServerCredentials> creds_; std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_; AsyncGenericService* generic_service_; |