aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_builder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r--src/cpp/server/server_builder.cc137
1 files changed, 106 insertions, 31 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 953a4337ec..00a90bb184 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -36,6 +36,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/resource_quota.h>
#include <grpc++/server.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -55,6 +56,7 @@ static void do_plugin_list_init(void) {
ServerBuilder::ServerBuilder()
: max_receive_message_size_(-1),
max_send_message_size_(-1),
+ sync_server_settings_(SyncServerSettings()),
resource_quota_(nullptr),
generic_service_(nullptr) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
@@ -63,6 +65,7 @@ ServerBuilder::ServerBuilder()
auto& factory = *it;
plugins_.emplace_back(factory());
}
+
// all compression algorithms enabled by default.
enabled_compression_algorithms_bitset_ =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
@@ -102,7 +105,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
+ (void*)service);
} else {
generic_service_ = service;
}
@@ -115,6 +118,25 @@ ServerBuilder& ServerBuilder::SetOption(
return *this;
}
+ServerBuilder& ServerBuilder::SetSyncServerOption(
+ ServerBuilder::SyncServerOption option, int val) {
+ switch (option) {
+ case NUM_CQS:
+ sync_server_settings_.num_cqs = val;
+ break;
+ case MIN_POLLERS:
+ sync_server_settings_.min_pollers = val;
+ break;
+ case MAX_POLLERS:
+ sync_server_settings_.max_pollers = val;
+ break;
+ case CQ_TIMEOUT_MSEC:
+ sync_server_settings_.cq_timeout_msec = val;
+ break;
+ }
+ return *this;
+}
+
ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
grpc_compression_algorithm algorithm, bool enabled) {
if (enabled) {
@@ -157,35 +179,24 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<ThreadPoolInterface> thread_pool;
- bool has_sync_methods = false;
- for (auto it = services_.begin(); it != services_.end(); ++it) {
- if ((*it)->service->has_synchronous_methods()) {
- if (!thread_pool) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- break;
- }
- }
- }
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
- if (!thread_pool && (*plugin)->has_sync_methods()) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- }
(*plugin)->UpdateChannelArguments(&args);
}
+
if (max_receive_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
+
if (max_send_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
+
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
enabled_compression_algorithms_bitset_);
if (maybe_default_compression_level_.is_set) {
@@ -196,31 +207,89 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
+
if (resource_quota_ != nullptr) {
args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
grpc_resource_quota_arg_vtable());
}
- std::unique_ptr<Server> server(new Server(thread_pool.release(), true,
- max_receive_message_size_, &args));
+
+ // == Determine if the server has any syncrhonous methods ==
+ bool has_sync_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+
+ if (!has_sync_methods) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin)->has_sync_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+ }
+
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(std::make_shared<
+ std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+
+ if (has_sync_methods) {
+ // This is a Sync server
+ gpr_log(GPR_INFO,
+ "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+ "%d, CQ timeout (msec): %d",
+ sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec);
+
+ // Create completion queues to listen to incoming rpc requests
+ for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
+ sync_server_cqs->emplace_back(new ServerCompletionQueue());
+ }
+ }
+
+ std::unique_ptr<Server> server(new Server(
+ max_receive_message_size_, &args, sync_server_cqs,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec));
+
ServerInitializer* initializer = server->initializer();
- // If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
- int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
-
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- // A completion queue that is not polled frequently (by calling Next() or
- // AsyncNext()) is not safe to use for listening to incoming channels.
- // Register all such completion queues as non-listening completion queues
- // with the GRPC core library.
- if ((*cq)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
+
+ // All sync cqs (if any) are frequently polled by ThreadManager
+ int num_frequently_polled_cqs = sync_server_cqs->size();
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
- (*cq)->cq(), nullptr);
+ (*it)->cq(), nullptr);
}
}
@@ -236,9 +305,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->InitServer(initializer);
}
+
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -251,6 +322,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
}
}
+
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
@@ -258,13 +330,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r;
}
}
+
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
}
+
return server;
}