diff options
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r-- | src/cpp/server/server_builder.cc | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 61f0f6ae2a..54feac3982 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder() } } -std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { - ServerCompletionQueue* cq = new ServerCompletionQueue(); +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( + bool is_frequently_polled) { + ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -99,10 +100,12 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr, std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<ThreadPoolInterface> thread_pool; + bool has_sync_methods = false; for (auto it = services_.begin(); it != services_.end(); ++it) { if ((*it)->service->has_synchronous_methods()) { if (thread_pool == nullptr) { thread_pool.reset(CreateDefaultThreadPool()); + has_sync_methods = true; break; } } @@ -116,6 +119,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { if ((*plugin).second->has_sync_methods()) { thread_pool.reset(CreateDefaultThreadPool()); + has_sync_methods = true; break; } } @@ -128,10 +132,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> server( new Server(thread_pool.release(), true, max_message_size_, &args)); ServerInitializer* initializer = server->initializer(); + + // If the server has atleast one sync methods, we know that this is a Sync + // server or a Hybrid server and the completion queue (server->cq_) would be + // frequently polled. + int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), - nullptr); + // A completion queue that is not polled frequently (by calling Next() or + // AsyncNext()) is not safe to use for listening to incoming channels. + // Register all such completion queues as non-listening completion queues + // with the GRPC core library. + if ((*cq)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + num_frequently_polled_cqs++; + } else { + grpc_server_register_non_listening_completion_queue(server->server_, + (*cq)->cq(), nullptr); + } + } + + if (num_frequently_polled_cqs == 0) { + gpr_log(GPR_ERROR, + "At least one of the completion queues must be frequently polled"); + return nullptr; } + for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService((*service)->host.get(), (*service)->service)) { |