diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-05-16 11:06:30 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-05-16 11:06:30 -0700 |
commit | 4790263c5f20865ed4279f479b184205dd7209b4 (patch) | |
tree | 2ee321081bdad50b32587fe4b8215e5d01d24042 /src/cpp/server | |
parent | 088891119f17b827b4f45f09e39411f007618ddf (diff) | |
parent | 1ba1bba66a18b6b7986a1cfa52c6f1ac4a14a029 (diff) |
Merge branch 'master' into server_channel_affinity
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server.cc | 19 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 44 |
2 files changed, 59 insertions, 4 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index fafe31e84c..f955a31494 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -33,6 +33,7 @@ #include <grpc++/server.h> +#include <sstream> #include <utility> #include <grpc++/completion_queue.h> @@ -41,6 +42,7 @@ #include <grpc++/impl/grpc_library.h> #include <grpc++/impl/method_handler_impl.h> #include <grpc++/impl/rpc_service_method.h> +#include <grpc++/impl/server_initializer.h> #include <grpc++/impl/service_type.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server_context.h> @@ -284,7 +286,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, has_generic_service_(false), server_(nullptr), thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned) { + thread_pool_owned_(thread_pool_owned), + server_initializer_(new ServerInitializer(this)) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; @@ -341,6 +344,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { "Can only register an asynchronous service against one server."); service->server_ = this; } + const char* method_name = nullptr; for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { if (it->get() == nullptr) { // Handled by generic service if any. @@ -360,6 +364,17 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } else { sync_methods_->emplace_back(method, tag); } + method_name = method->name(); + } + + // Parse service name. + if (method_name != nullptr) { + std::stringstream ss(method_name); + grpc::string service_name; + if (std::getline(ss, service_name, '/') && + std::getline(ss, service_name, '/')) { + services_.push_back(service_name); + } } return true; } @@ -598,4 +613,6 @@ void Server::RunRpc() { } } +ServerInitializer* Server::initializer() { return server_initializer_.get(); } + } // namespace grpc diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 9cd7cb2da3..a2d90c2974 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -41,9 +41,23 @@ namespace grpc { +static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>* + g_plugin_factory_list; +static gpr_once once_init_plugin_list = GPR_ONCE_INIT; + +static void do_plugin_list_init(void) { + g_plugin_factory_list = + new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>(); +} + ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr) { grpc_compression_options_init(&compression_options_); + gpr_once_init(&once_init_plugin_list, do_plugin_list_init); + for (auto factory : (*g_plugin_factory_list)) { + std::unique_ptr<ServerBuilderPlugin> plugin = factory(); + plugins_[plugin->name()] = std::move(plugin); + } } std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( @@ -100,18 +114,30 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); + (*option)->UpdatePlugins(&plugins_); + } + if (thread_pool == nullptr) { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + if ((*plugin).second->has_sync_methods()) { + thread_pool.reset(CreateDefaultThreadPool()); + break; + } + } } if (max_message_size_ > 0) { args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_); } - args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, + args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, compression_options_.enabled_algorithms_bitset); std::unique_ptr<Server> server( new Server(thread_pool.release(), true, max_message_size_, &args)); + ServerInitializer* initializer = server->initializer(); + // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. + // server or a Hybrid server. This means that the completion queue on the + // Server object (i.e server->cq_) will be frequently polled (which is why + // we initialize num_frequently_pollsed_cqs to 1 here) int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { @@ -141,6 +167,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + (*plugin).second->InitServer(initializer); + } if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } else { @@ -164,7 +193,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { if (!server->Start(cqs_data, cqs_.size())) { return nullptr; } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + (*plugin).second->Finish(initializer); + } return server; } +void ServerBuilder::InternalAddPluginFactory( + std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) { + gpr_once_init(&once_init_plugin_list, do_plugin_list_init); + (*g_plugin_factory_list).push_back(CreatePlugin); +} + } // namespace grpc |