aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_builder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r--src/cpp/server/server_builder.cc33
1 files changed, 28 insertions, 5 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 61f0f6ae2a..a5bcd3db31 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder()
}
}
-std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
- ServerCompletionQueue* cq = new ServerCompletionQueue();
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+ bool is_frequently_polled) {
+ ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@@ -127,11 +128,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
+
ServerInitializer* initializer = server->initializer();
+
+ int num_non_listening_cqs = 0;
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
- nullptr);
+ // A completion queue that is not polled frequently (by calling Next() or
+ // AsyncNext()) is not safe to use for listening to incoming channels.
+ // Register all such completion queues as non-listening completion queues
+ // with the GRPC core library.
+ if ((*cq)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
+ } else {
+ grpc_server_register_non_listening_completion_queue(server->server_,
+ (*cq)->cq(), nullptr);
+ num_non_listening_cqs++;
+ }
}
+
+ // TODO: (sreek) - Find a good way to determine whether the server is a Sync
+ // server or an Async server. In case of Async server, return an error if all
+ // the completion queues are non-listening
+ if (num_non_listening_cqs > 0) {
+ gpr_log(GPR_INFO, "Number of non listening completion queues: %d out of %d",
+ num_non_listening_cqs, cqs_.size());
+ }
+
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService((*service)->host.get(), (*service)->service)) {