diff options
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r-- | src/cpp/server/server_builder.cc | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 8417c45e64..0dc03b6876 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -71,7 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { ServerCompletionQueue* cq = new ServerCompletionQueue( - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -256,14 +258,22 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type)); + sync_server_cqs->emplace_back( + new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - std::unique_ptr<Server> server(new Server( - max_receive_message_size_, &args, sync_server_cqs, - sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, resource_quota_)); + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_callback_methods()) { + has_callback_methods = true; + break; + } + } + + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods if (has_sync_methods) { // This is a Sync server @@ -275,6 +285,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + + std::unique_ptr<Server> server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec, resource_quota_, + std::move(interceptor_creators_))); + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -288,6 +308,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { num_frequently_polled_cqs++; } + if (has_callback_methods) { + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + num_frequently_polled_cqs++; + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for |