diff options
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r-- | src/cpp/server/server_builder.cc | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 61f0f6ae2a..a5bcd3db31 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder() } } -std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { - ServerCompletionQueue* cq = new ServerCompletionQueue(); +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( + bool is_frequently_polled) { + ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -127,11 +128,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { compression_options_.enabled_algorithms_bitset); std::unique_ptr<Server> server( new Server(thread_pool.release(), true, max_message_size_, &args)); + ServerInitializer* initializer = server->initializer(); + + int num_non_listening_cqs = 0; for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), - nullptr); + // A completion queue that is not polled frequently (by calling Next() or + // AsyncNext()) is not safe to use for listening to incoming channels. + // Register all such completion queues as non-listening completion queues + // with the GRPC core library. + if ((*cq)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + } else { + grpc_server_register_non_listening_completion_queue(server->server_, + (*cq)->cq(), nullptr); + num_non_listening_cqs++; + } } + + // TODO: (sreek) - Find a good way to determine whether the server is a Sync + // server or an Async server. In case of Async server, return an error if all + // the completion queues are non-listening + if (num_non_listening_cqs > 0) { + gpr_log(GPR_INFO, "Number of non listening completion queues: %d out of %d", + num_non_listening_cqs, cqs_.size()); + } + for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService((*service)->host.get(), (*service)->service)) { |