aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-08-24 10:01:13 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-09-21 01:39:13 -0700
commitaabada97a1db3a2d722aa1bc1c48d13c90bbaea9 (patch)
tree2ec91652ece2e173ca9bd7c6212c37d7cef6a59e /src/cpp
parent3ea9e247e0f933fd303c94f6f2397580983946cd (diff)
One RPCMgr instance per CQ
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.cc37
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.h59
-rw-r--r--src/cpp/server/server.cc190
-rw-r--r--src/cpp/server/server_builder.cc72
-rw-r--r--src/cpp/server/server_posix.cc3
5 files changed, 292 insertions, 69 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
index f0a4057857..4236fcefaf 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.cc
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -54,14 +54,12 @@ GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
thd_.reset();
}
-GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers,
- int max_threads)
+GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers),
- num_threads_(0),
- max_threads_(max_threads) {}
+ num_threads_(0) {}
GrpcRpcManager::~GrpcRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
@@ -84,6 +82,11 @@ void GrpcRpcManager::ShutdownRpcManager() {
shutdown_ = true;
}
+bool GrpcRpcManager::IsShutdown() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ return shutdown_;
+}
+
void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
std::unique_lock<grpc::mutex> lock(list_mu_);
completed_threads_.push_back(thd);
@@ -108,8 +111,7 @@ void GrpcRpcManager::Initialize() {
// below the maximum threshold, we can let the current thread continue as poller
bool GrpcRpcManager::MaybeContinueAsPoller() {
std::unique_lock<grpc::mutex> lock(mu_);
- if (shutdown_ || num_pollers_ > max_pollers_ ||
- num_threads_ >= max_threads_) {
+ if (shutdown_ || num_pollers_ > max_pollers_) {
return false;
}
@@ -122,8 +124,7 @@ bool GrpcRpcManager::MaybeContinueAsPoller() {
// min_pollers_) and the total number of threads is below the maximum threshold
void GrpcRpcManager::MaybeCreatePoller() {
grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_ && num_pollers_ < min_pollers_ &&
- num_threads_ < max_threads_) {
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
@@ -133,28 +134,38 @@ void GrpcRpcManager::MaybeCreatePoller() {
}
void GrpcRpcManager::MainWorkLoop() {
- bool is_work_found = false;
void* tag;
+ bool ok;
/*
1. Poll for work (i.e PollForWork())
- 2. After returning from PollForWork, reduce the number of pollers by 1
+ 2. After returning from PollForWork, reduce the number of pollers by 1. If
+ PollForWork() returned a TIMEOUT, then it may indicate that we have more
+ polling threads than needed. Check if the number of pollers is greater
+ than min_pollers and if so, terminate the thread.
3. Since we are short of one poller now, see if a new poller has to be
created (i.e see MaybeCreatePoller() for more details)
4. Do the actual work (DoWork())
5. After doing the work, see it this thread can resume polling work (i.e
see MaybeContinueAsPoller() for more details) */
do {
- PollForWork(is_work_found, &tag);
+ WorkStatus work_status = PollForWork(&tag, &ok);
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_pollers_--;
+
+ if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+ break;
+ }
}
- if (is_work_found) {
+ // TODO (sreek) See if we need to check for shutdown here and quit
+ // Note that MaybeCreatePoller does check for shutdown and creates a new
+ // thread only if GrpcRpcManager is not shutdown
+ if (work_status == WORK_FOUND) {
MaybeCreatePoller();
- DoWork(tag);
+ DoWork(tag, ok);
}
} while (MaybeContinueAsPoller());
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
index 475ce97995..d00771b9a1 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.h
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -44,17 +44,56 @@ namespace grpc {
class GrpcRpcManager {
public:
- explicit GrpcRpcManager(int min_pollers, int max_pollers, int max_threads);
+ explicit GrpcRpcManager(int min_pollers, int max_pollers);
virtual ~GrpcRpcManager();
// This function MUST be called before using the object
void Initialize();
- virtual void PollForWork(bool& is_work_found, void **tag) = 0;
- virtual void DoWork(void *tag) = 0;
+ enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
+
+ // "Polls" for new work.
+ // If the return value is WORK_FOUND:
+ // - The implementaion of PollForWork() MAY set some opaque identifier to
+ // (identify the work item found) via the '*tag' parameter
+ // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
+ // value of 'false' indicates some implemenation specific error (that is
+ // neither SHUTDOWN nor TIMEOUT)
+ // - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
+ // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+ // DoWork()
+ //
+ // If the return value is SHUTDOWN:,
+ // - GrpcManager WILL NOT call DoWork() and terminates the thead
+ //
+ // If the return value is TIMEOUT:,
+ // - GrpcManager WILL NOT call DoWork()
+ // - GrpcManager MAY terminate the thread depending on the current number of
+ // active poller threads and mix_pollers/max_pollers settings
+ // - Also, the value of timeout is specific to the derived class
+ // implementation
+ virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
+
+ // The implementation of DoWork() is supposed to perform the work found by
+ // PollForWork(). The tag and ok parameters are the same as returned by
+ // PollForWork()
+ //
+ // The implementation of DoWork() should also do any setup needed to ensure
+ // that the next call to PollForWork() (not necessarily by the current thread)
+ // actually finds some work
+ virtual void DoWork(void* tag, bool ok) = 0;
+
+ // Mark the GrpcRpcManager as shutdown and begin draining the work.
+ // This is a non-blocking call and the caller should call Wait(), a blocking
+ // call which returns only once the shutdown is complete
+ void ShutdownRpcManager();
+ // Has ShutdownRpcManager() been called
+ bool IsShutdown();
+
+ // A blocking call that returns only after the GrpcRpcManager has shutdown and
+ // all the threads have drained all the outstanding work
void Wait();
- void ShutdownRpcManager();
private:
// Helper wrapper class around std::thread. This takes a GrpcRpcManager object
@@ -63,8 +102,6 @@ class GrpcRpcManager {
// The Run() function calls GrpcManager::MainWorkLoop() function and once that
// completes, it marks the GrpcRpcManagerThread completed by calling
// GrpcRpcManager::MarkAsCompleted()
- // TODO: sreek - Consider using a separate threadpool rather than implementing
- // one in this class
class GrpcRpcManagerThread {
public:
GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
@@ -83,13 +120,11 @@ class GrpcRpcManager {
void MainWorkLoop();
// Create a new poller if the number of current pollers is less than the
- // minimum number of pollers needed (i.e min_pollers) and the total number of
- // threads are less than the max number of threads (i.e max_threads)
+ // minimum number of pollers needed (i.e min_pollers).
void MaybeCreatePoller();
// Returns true if the current thread can resume as a poller. i.e if the
- // current number of pollers is less than the max_pollers AND the total number
- // of threads is less than max_threads
+ // current number of pollers is less than the max_pollers.
bool MaybeContinueAsPoller();
void MarkAsCompleted(GrpcRpcManagerThread* thd);
@@ -113,10 +148,6 @@ class GrpcRpcManager {
// currently polling i.e num_pollers_)
int num_threads_;
- // The maximum number of threads that can be active (This is a soft limit and
- // the actual number of threads may sometimes be briefly above this number)
- int max_threads_;
-
grpc::mutex list_mu_;
std::list<GrpcRpcManagerThread*> completed_threads_;
};
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index a436ee43e9..28b874d9fb 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -275,15 +275,99 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
+class Server::SyncRequestManager : public GrpcRpcManager {
+ public:
+ SyncRequestManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers)
+ : GrpcRpcManager(min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ global_callbacks_(global_callbacks) {}
+
+ static const int kRpcPollingTimeoutMsec = 500;
+
+ WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+ *tag = nullptr;
+ gpr_timespec deadline =
+ gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ if (ok && sync_req) {
+ SyncRequest::CallData cd(server_, sync_req);
+ {
+ sync_req->SetupRequest();
+ if (!IsShutdown()) {
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ } else {
+ sync_req->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+
+ // TODO (sreek): If ok == false, log an error
+ }
+
+ void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ sync_methods_.emplace_back(method, tag);
+ }
+
+ void AddUnknownSyncMethod() {
+ // TODO (sreek) - Check if !sync_methods_.empty() is really needed here
+ if (!sync_methods_.empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void Start() {
+ if (!sync_methods_.empty()) {
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->SetupRequest();
+ m->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GrpcRpcManager::Initialize();
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ std::vector<SyncRequest> sync_methods_;
+ std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(bool has_sync_methods, int max_message_size,
- ChannelArguments* args)
- : GrpcRpcManager(3, 5, 8),
- max_message_size_(max_message_size),
+Server::Server(
+ std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
+ int max_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers)
+ : max_message_size_(max_message_size),
+ sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
- sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
server_initializer_(new ServerInitializer(this)) {
@@ -291,16 +375,17 @@ Server::Server(bool has_sync_methods, int max_message_size,
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
+
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ sync_req_mgrs_.emplace_back(new SyncRequestManager(
+ this, &(*it), global_callbacks_, min_pollers, max_pollers));
+ }
+
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
- server_ = grpc_server_create(&channel_args, nullptr);
- if (!has_sync_methods) {
- grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
- nullptr);
- } else {
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
- }
+ server_ = grpc_server_create(&channel_args, nullptr);
}
Server::~Server() {
@@ -310,15 +395,20 @@ Server::~Server() {
lock.unlock();
Shutdown();
} else if (!started_) {
+ // TODO (sreek): Shutdown all cqs
+ /*
cq_.Shutdown();
+ */
}
}
+ // TODO(sreek) Do thisfor all cqs ?
+ /*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+ */
grpc_server_destroy(server_);
- delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -329,8 +419,6 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
grpc_server* Server::c_server() { return server_; }
-CompletionQueue* Server::completion_queue() { return &cq_; }
-
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -351,6 +439,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@@ -369,7 +458,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
- sync_methods_->emplace_back(method, tag);
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, tag);
+ }
}
method_name = method->name();
}
@@ -405,13 +496,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
}
for (size_t i = 0; i < num_cqs; i++) {
@@ -421,6 +507,12 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ /* TODO (Sreek) - Do this for all cqs */
+ /*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -430,26 +522,73 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GrpcRpcManager::Initialize();
}
+ */
return true;
}
+// TODO (sreek) - Reimplement this
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
+
+ int shutdown_tag = 0; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
+
+ // Shutdown all RpcManagers. This will try to gracefully stop all the
+ // threads in the RpcManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownRpcManager();
+ }
+
+ shutdown_cq_.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq_.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace-period for
+ // a clean shutdown. We should force a shutdown now by cancelling all
+ // inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Wait for threads in all RpcManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ }
+
+ // Shutdown the completion queues
+ // TODO (sreek) Move this into SyncRequestManager
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ (*it).Shutdown();
+ }
+
+ /*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+ */
+ // TODO (sreek) Delete this
+ /*
GrpcRpcManager::ShutdownRpcManager();
GrpcRpcManager::Wait();
+ */
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
+ //
+ /*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@@ -461,6 +600,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
}
lock.lock();
+ */
/* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
@@ -642,6 +782,8 @@ void Server::RunRpc() {
*/
}
+/* TODO (sreek) Move this to SyncRequestManager */
+/*
void Server::PollForWork(bool& is_work_found, void** tag) {
is_work_found = true;
*tag = nullptr;
@@ -651,6 +793,7 @@ void Server::PollForWork(bool& is_work_found, void** tag) {
}
}
+
void Server::DoWork(void* tag) {
auto* mrd = static_cast<SyncRequest*>(tag);
if (mrd) {
@@ -669,6 +812,7 @@ void Server::DoWork(void* tag) {
cd.Run(global_callbacks_);
}
}
+*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 760309d911..786195ed6c 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -93,7 +93,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
+ (void *) service);
} else {
generic_service_ = service;
}
@@ -138,7 +138,6 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
-
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
@@ -157,6 +156,35 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
}
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs(
+ new std::vector<ServerCompletionQueue>());
+
+ if (has_sync_methods) {
+ // If the server has synchronous methods, it will need completion queues to
+ // handle those methods. Create one cq per core (or create 4 if number of
+ // cores is less than 4 or unavailable)
+ //
+ // TODO (sreek) - The default number 4 is just a guess. Check if a lower or
+ // higher number makes sense
+ int num_cqs = gpr_cpu_num_cores();
+ num_cqs = GPR_MAX(num_cqs, 4);
+
+ for (int i = 0; i < num_cqs; i++) {
+ // emplace_back() would have been ideal here but doesn't work since the
+ // ServerCompletionQueue's constructor is private. With emplace_back, the
+ // constructor is called from somewhere within the library; so making
+ // ServerBuilder class a friend to ServerCompletion queue won't help.
+ sync_server_cqs->push_back(ServerCompletionQueue());
+ }
+ }
+
// == Channel args ==
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@@ -178,28 +206,38 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
maybe_default_compression_algorithm_.algorithm);
}
- std::unique_ptr<Server> server(
- new Server(has_sync_methods, max_message_size_, &args));
+ // TODO (sreek) Make the number of pollers configurable
+ std::unique_ptr<Server> server(new Server(sync_server_cqs, max_message_size_,
+ &args, kDefaultMinPollers,
+ kDefaultMaxPollers));
ServerInitializer* initializer = server->initializer();
- // If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
- int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
-
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- // A completion queue that is not polled frequently (by calling Next() or
- // AsyncNext()) is not safe to use for listening to incoming channels.
- // Register all such completion queues as non-listening completion queues
- // with the GRPC core library.
- if ((*cq)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
+
+ // All sync cqs (if any) are frequently polled by the GrpcRpcManager
+ int num_frequently_polled_cqs = sync_server_cqs->size();
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, it->cq(), nullptr);
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
- (*cq)->cq(), nullptr);
+ (*it)->cq(), nullptr);
}
}
diff --git a/src/cpp/server/server_posix.cc b/src/cpp/server/server_posix.cc
index c3aa2adc60..33d42a8dc7 100644
--- a/src/cpp/server/server_posix.cc
+++ b/src/cpp/server/server_posix.cc
@@ -40,8 +40,7 @@ namespace grpc {
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
void AddInsecureChannelFromFd(Server* server, int fd) {
- grpc_server_add_insecure_channel_from_fd(
- server->c_server(), server->completion_queue()->cq(), fd);
+ grpc_server_add_insecure_channel_from_fd(server->c_server(), NULL, fd);
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD