diff options
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r-- | src/cpp/server/server_builder.cc | 137 |
1 files changed, 106 insertions, 31 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 953a4337ec..00a90bb184 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -36,6 +36,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/resource_quota.h> #include <grpc++/server.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -55,6 +56,7 @@ static void do_plugin_list_init(void) { ServerBuilder::ServerBuilder() : max_receive_message_size_(-1), max_send_message_size_(-1), + sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr), generic_service_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); @@ -63,6 +65,7 @@ ServerBuilder::ServerBuilder() auto& factory = *it; plugins_.emplace_back(factory()); } + // all compression algorithms enabled by default. enabled_compression_algorithms_bitset_ = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; @@ -102,7 +105,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - service); + (void*)service); } else { generic_service_ = service; } @@ -115,6 +118,25 @@ ServerBuilder& ServerBuilder::SetOption( return *this; } +ServerBuilder& ServerBuilder::SetSyncServerOption( + ServerBuilder::SyncServerOption option, int val) { + switch (option) { + case NUM_CQS: + sync_server_settings_.num_cqs = val; + break; + case MIN_POLLERS: + sync_server_settings_.min_pollers = val; + break; + case MAX_POLLERS: + sync_server_settings_.max_pollers = val; + break; + case CQ_TIMEOUT_MSEC: + sync_server_settings_.cq_timeout_msec = val; + break; + } + return *this; +} + ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( grpc_compression_algorithm algorithm, bool enabled) { if (enabled) { @@ -157,35 +179,24 @@ ServerBuilder& ServerBuilder::AddListeningPort( } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - std::unique_ptr<ThreadPoolInterface> thread_pool; - bool has_sync_methods = false; - for (auto it = services_.begin(); it != services_.end(); ++it) { - if ((*it)->service->has_synchronous_methods()) { - if (!thread_pool) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - break; - } - } - } ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); (*option)->UpdatePlugins(&plugins_); } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { - if (!thread_pool && (*plugin)->has_sync_methods()) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - } (*plugin)->UpdateChannelArguments(&args); } + if (max_receive_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } + if (max_send_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); } + args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, enabled_compression_algorithms_bitset_); if (maybe_default_compression_level_.is_set) { @@ -196,31 +207,89 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, maybe_default_compression_algorithm_.algorithm); } + if (resource_quota_ != nullptr) { args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_, grpc_resource_quota_arg_vtable()); } - std::unique_ptr<Server> server(new Server(thread_pool.release(), true, - max_receive_message_size_, &args)); + + // == Determine if the server has any syncrhonous methods == + bool has_sync_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; + break; + } + } + + if (!has_sync_methods) { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + if ((*plugin)->has_sync_methods()) { + has_sync_methods = true; + break; + } + } + } + + // If this is a Sync server, i.e a server expositing sync API, then the server + // needs to create some completion queues to listen for incoming requests. + // 'sync_server_cqs' are those internal completion queues. + // + // This is different from the completion queues added to the server via + // ServerBuilder's AddCompletionQueue() method (those completion queues + // are in 'cqs_' member variable of ServerBuilder object) + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs(std::make_shared< + std::vector<std::unique_ptr<ServerCompletionQueue>>>()); + + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + + // Create completion queues to listen to incoming rpc requests + for (int i = 0; i < sync_server_settings_.num_cqs; i++) { + sync_server_cqs->emplace_back(new ServerCompletionQueue()); + } + } + + std::unique_ptr<Server> server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec)); + ServerInitializer* initializer = server->initializer(); - // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. - int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; - - for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - // A completion queue that is not polled frequently (by calling Next() or - // AsyncNext()) is not safe to use for listening to incoming channels. - // Register all such completion queues as non-listening completion queues - // with the GRPC core library. - if ((*cq)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + // Register all the completion queues with the server. i.e + // 1. sync_server_cqs: internal completion queues created IF this is a sync + // server + // 2. cqs_: Completion queues added via AddCompletionQueue() call + + // All sync cqs (if any) are frequently polled by ThreadManager + int num_frequently_polled_cqs = sync_server_cqs->size(); + + for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); + } + + // cqs_ contains the completion queue added by calling the ServerBuilder's + // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by + // calling Next() or AsyncNext()) and hence are not safe to be used for + // listening to incoming channels. Such completion queues must be registered + // as non-listening queues + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, - (*cq)->cq(), nullptr); + (*it)->cq(), nullptr); } } @@ -236,9 +305,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->InitServer(initializer); } + if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } else { @@ -251,6 +322,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } } } + for (auto port = ports_.begin(); port != ports_.end(); port++) { int r = server->AddListeningPort(port->addr, port->creds.get()); if (!r) return nullptr; @@ -258,13 +330,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } + auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; if (!server->Start(cqs_data, cqs_.size())) { return nullptr; } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->Finish(initializer); } + return server; } |