aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/impl/codegen/server_interface.h6
-rw-r--r--include/grpc++/server.h64
-rw-r--r--include/grpc++/server_builder.h35
3 files changed, 74 insertions, 31 deletions
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 4a00d7a3a1..5c41ca51b4 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -126,12 +126,6 @@ class ServerInterface : public CallHook {
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
- /// Process one or more incoming calls.
- virtual void RunRpc() = 0;
-
- /// Schedule \a RunRpc to run in the threadpool.
- virtual void ScheduleCallback() = 0;
-
virtual void ShutdownInternal(gpr_timespec deadline) = 0;
virtual int max_receive_message_size() const = 0;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index f51a6c658f..a6d70c7577 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -105,18 +105,41 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
class AsyncRequest;
class ShutdownRequest;
+ /// SyncRequestThreadManager is an implementation of ThreadManager. This class
+ /// is responsible for polling for incoming RPCs and calling the RPC handlers.
+ /// This is only used in case of a Sync server (i.e a server exposing a sync
+ /// interface)
+ class SyncRequestThreadManager;
+
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
- /// \param thread_pool The threadpool instance to use for call processing.
- /// \param thread_pool_owned Does the server own the \a thread_pool instance?
- /// \param max_receive_message_size Maximum message length that the channel
- /// can receive.
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args);
+ /// \param max_message_size Maximum message length that the channel can
+ /// receive.
+ ///
+ /// \param args The channel args
+ ///
+ /// \param sync_server_cqs The completion queues to use if the server is a
+ /// synchronous server (or a hybrid server). The server polls for new RPCs on
+ /// these queues
+ ///
+ /// \param min_pollers The minimum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param max_pollers The maximum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
+ /// server completion queues passed via sync_server_cqs param.
+ Server(int max_message_size, ChannelArguments* args,
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@@ -151,12 +174,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE;
- /// Process one or more incoming calls.
- void RunRpc() GRPC_OVERRIDE;
-
- /// Schedule \a RunRpc to run in the threadpool.
- void ScheduleCallback() GRPC_OVERRIDE;
-
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE;
@@ -171,34 +188,31 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
const int max_receive_message_size_;
- // Completion queue.
- CompletionQueue cq_;
+ /// The following completion queues are ONLY used in case of Sync API i.e if
+ /// the server has any services with sync methods. The server uses these
+ /// completion queues to poll for new RPCs
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs_;
+
+ /// List of ThreadManager instances (one for each cq in the sync_server_cqs)
+ std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
// Sever status
grpc::mutex mu_;
bool started_;
bool shutdown_;
- bool shutdown_notified_;
- // The number of threads which are running callbacks.
- int num_running_cb_;
- grpc::condition_variable callback_cv_;
+ bool shutdown_notified_; // Was notify called on the shutdown_cv_
grpc::condition_variable shutdown_cv_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
- std::list<SyncRequest>* sync_methods_;
std::vector<grpc::string> services_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
- // Pointer to the c grpc server.
+ // Pointer to the wrapped grpc_server.
grpc_server* server_;
- ThreadPoolInterface* thread_pool_;
- // Whether the thread pool is created and owned by the server.
- bool thread_pool_owned_;
-
std::unique_ptr<ServerInitializer> server_initializer_;
};
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 37f1f8cb80..c6bcf8b90a 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_SERVER_BUILDER_H
#define GRPCXX_SERVER_BUILDER_H
+#include <climits>
#include <map>
#include <memory>
#include <vector>
@@ -42,6 +43,8 @@
#include <grpc++/impl/server_builder_plugin.h>
#include <grpc++/support/config.h>
#include <grpc/compression.h>
+#include <grpc/support/cpu.h>
+#include <grpc/support/useful.h>
namespace grpc {
@@ -62,6 +65,8 @@ class ServerBuilder {
public:
ServerBuilder();
+ enum SyncServerOption { NUM_CQS, MIN_POLLERS, MAX_POLLERS, CQ_TIMEOUT_MSEC };
+
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
@@ -115,6 +120,9 @@ class ServerBuilder {
ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);
+ /// Only useful if this is a Synchronous server.
+ ServerBuilder& SetSyncServerOption(SyncServerOption option, int value);
+
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
@@ -170,6 +178,28 @@ class ServerBuilder {
int* selected_port;
};
+ struct SyncServerSettings {
+ SyncServerSettings()
+ : num_cqs(GPR_MAX(gpr_cpu_num_cores(), 4)),
+ min_pollers(1),
+ max_pollers(INT_MAX),
+ cq_timeout_msec(1000) {}
+
+ // Number of server completion queues to create to listen to incoming RPCs.
+ int num_cqs;
+
+ // Minimum number of threads per completion queue that should be listening
+ // to incoming RPCs.
+ int min_pollers;
+
+ // Maximum number of threads per completion queue that can be listening to
+ // incoming RPCs.
+ int max_pollers;
+
+ // The timeout for server completion queue's AsyncNext call.
+ int cq_timeout_msec;
+ };
+
typedef std::unique_ptr<grpc::string> HostString;
struct NamedService {
explicit NamedService(Service* s) : service(s) {}
@@ -184,7 +214,12 @@ class ServerBuilder {
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
+
+ SyncServerSettings sync_server_settings_;
+
+ // List of completion queues added via AddCompletionQueue() method
std::vector<ServerCompletionQueue*> cqs_;
+
std::shared_ptr<ServerCredentials> creds_;
std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
AsyncGenericService* generic_service_;