aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-05-16 11:06:30 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-05-16 11:06:30 -0700
commit4790263c5f20865ed4279f479b184205dd7209b4 (patch)
tree2ee321081bdad50b32587fe4b8215e5d01d24042 /src/cpp/server
parent088891119f17b827b4f45f09e39411f007618ddf (diff)
parent1ba1bba66a18b6b7986a1cfa52c6f1ac4a14a029 (diff)
Merge branch 'master' into server_channel_affinity
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server.cc19
-rw-r--r--src/cpp/server/server_builder.cc44
2 files changed, 59 insertions, 4 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index fafe31e84c..f955a31494 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -33,6 +33,7 @@
#include <grpc++/server.h>
+#include <sstream>
#include <utility>
#include <grpc++/completion_queue.h>
@@ -41,6 +42,7 @@
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
+#include <grpc++/impl/server_initializer.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server_context.h>
@@ -284,7 +286,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
has_generic_service_(false),
server_(nullptr),
thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {
+ thread_pool_owned_(thread_pool_owned),
+ server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
@@ -341,6 +344,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+ const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
if (it->get() == nullptr) { // Handled by generic service if any.
@@ -360,6 +364,17 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
} else {
sync_methods_->emplace_back(method, tag);
}
+ method_name = method->name();
+ }
+
+ // Parse service name.
+ if (method_name != nullptr) {
+ std::stringstream ss(method_name);
+ grpc::string service_name;
+ if (std::getline(ss, service_name, '/') &&
+ std::getline(ss, service_name, '/')) {
+ services_.push_back(service_name);
+ }
}
return true;
}
@@ -598,4 +613,6 @@ void Server::RunRpc() {
}
}
+ServerInitializer* Server::initializer() { return server_initializer_.get(); }
+
} // namespace grpc
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 9cd7cb2da3..a2d90c2974 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -41,9 +41,23 @@
namespace grpc {
+static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
+ g_plugin_factory_list;
+static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
+
+static void do_plugin_list_init(void) {
+ g_plugin_factory_list =
+ new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
+}
+
ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ for (auto factory : (*g_plugin_factory_list)) {
+ std::unique_ptr<ServerBuilderPlugin> plugin = factory();
+ plugins_[plugin->name()] = std::move(plugin);
+ }
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
@@ -100,18 +114,30 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
+ (*option)->UpdatePlugins(&plugins_);
+ }
+ if (thread_pool == nullptr) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin).second->has_sync_methods()) {
+ thread_pool.reset(CreateDefaultThreadPool());
+ break;
+ }
+ }
}
if (max_message_size_ > 0) {
args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
}
- args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
+ ServerInitializer* initializer = server->initializer();
+
// If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
+ // server or a Hybrid server. This means that the completion queue on the
+ // Server object (i.e server->cq_) will be frequently polled (which is why
+ // we initialize num_frequently_pollsed_cqs to 1 here)
int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@@ -141,6 +167,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin).second->InitServer(initializer);
+ }
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -164,7 +193,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin).second->Finish(initializer);
+ }
return server;
}
+void ServerBuilder::InternalAddPluginFactory(
+ std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ (*g_plugin_factory_list).push_back(CreatePlugin);
+}
+
} // namespace grpc