diff options
author | 2016-05-14 13:05:41 -0700 | |
---|---|---|
committer | 2016-05-14 13:05:41 -0700 | |
commit | f7a670fe6da0538679dab200132b2d15742b4963 (patch) | |
tree | 0d20f1e2f61c6855bd3cea87a88fd2fa3f140c90 /src/cpp/server/server_builder.cc | |
parent | 1ba1bba66a18b6b7986a1cfa52c6f1ac4a14a029 (diff) | |
parent | 088891119f17b827b4f45f09e39411f007618ddf (diff) |
Merge branch 'server_channel_affinity' of github.com:sreecha/grpc into affine
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r-- | src/cpp/server/server_builder.cc | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 61f0f6ae2a..fbcb3cef1b 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder() } } -std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { - ServerCompletionQueue* cq = new ServerCompletionQueue(); +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( + bool is_frequently_polled) { + ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -99,8 +100,11 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr, std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<ThreadPoolInterface> thread_pool; + // Does this server have atleast one sync method + bool has_sync_methods = false; for (auto it = services_.begin(); it != services_.end(); ++it) { if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; if (thread_pool == nullptr) { thread_pool.reset(CreateDefaultThreadPool()); break; @@ -128,10 +132,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> server( new Server(thread_pool.release(), true, max_message_size_, &args)); ServerInitializer* initializer = server->initializer(); + + // If the server has atleast one sync methods, we know that this is a Sync + // server or a Hybrid server and the completion queue (server->cq_) would be + // frequently polled. + int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), - nullptr); + // A completion queue that is not polled frequently (by calling Next() or + // AsyncNext()) is not safe to use for listening to incoming channels. + // Register all such completion queues as non-listening completion queues + // with the GRPC core library. + if ((*cq)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + num_frequently_polled_cqs++; + } else { + grpc_server_register_non_listening_completion_queue(server->server_, + (*cq)->cq(), nullptr); + } + } + + if (num_frequently_polled_cqs == 0) { + gpr_log(GPR_ERROR, + "Atleast one of the completion queues must be frequently polled"); + return nullptr; } + for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService((*service)->host.get(), (*service)->service)) { |