aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-24 06:55:47 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-24 06:55:47 -0700
commitccc8ec54b0cf75f52c0a19a2674d6a6bc8e93743 (patch)
tree5a6fd5fedbac12b80b46b1208a5d4b3f95ba1b90 /src/cpp
parent34fc500cfa0b3f282bbcde385f1f337fff5ae213 (diff)
parent48abdde19722a9d24d4f590b9d197db29f498ed1 (diff)
Merge branch 'error' into reuse_port
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/server/server.cc11
-rw-r--r--src/cpp/server/server_builder.cc37
2 files changed, 41 insertions, 7 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index f955a31494..f6c3e5747c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -295,7 +295,12 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
server_ = grpc_server_create(&channel_args, nullptr);
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
+ if (thread_pool_ == nullptr) {
+ grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
+ nullptr);
+ } else {
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
+ }
}
Server::~Server() {
@@ -407,7 +412,9 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
}
for (size_t i = 0; i < num_cqs; i++) {
- new UnimplementedAsyncRequest(this, cqs[i]);
+ if (cqs[i]->IsFrequentlyPolled()) {
+ new UnimplementedAsyncRequest(this, cqs[i]);
+ }
}
}
// Start processing rpcs.
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 61f0f6ae2a..54feac3982 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder()
}
}
-std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
- ServerCompletionQueue* cq = new ServerCompletionQueue();
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+ bool is_frequently_polled) {
+ ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@@ -99,10 +100,12 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<ThreadPoolInterface> thread_pool;
+ bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
if (thread_pool == nullptr) {
thread_pool.reset(CreateDefaultThreadPool());
+ has_sync_methods = true;
break;
}
}
@@ -116,6 +119,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
if ((*plugin).second->has_sync_methods()) {
thread_pool.reset(CreateDefaultThreadPool());
+ has_sync_methods = true;
break;
}
}
@@ -128,10 +132,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
ServerInitializer* initializer = server->initializer();
+
+ // If the server has atleast one sync methods, we know that this is a Sync
+ // server or a Hybrid server and the completion queue (server->cq_) would be
+ // frequently polled.
+ int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
+
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
- nullptr);
+ // A completion queue that is not polled frequently (by calling Next() or
+ // AsyncNext()) is not safe to use for listening to incoming channels.
+ // Register all such completion queues as non-listening completion queues
+ // with the GRPC core library.
+ if ((*cq)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
+ num_frequently_polled_cqs++;
+ } else {
+ grpc_server_register_non_listening_completion_queue(server->server_,
+ (*cq)->cq(), nullptr);
+ }
+ }
+
+ if (num_frequently_polled_cqs == 0) {
+ gpr_log(GPR_ERROR,
+ "At least one of the completion queues must be frequently polled");
+ return nullptr;
}
+
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService((*service)->host.get(), (*service)->service)) {