aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_builder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_builder.cc')
-rw-r--r--src/cpp/server/server_builder.cc38
1 files changed, 32 insertions, 6 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 8417c45e64..0dc03b6876 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -71,7 +71,9 @@ ServerBuilder::~ServerBuilder() {
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
ServerCompletionQueue* cq = new ServerCompletionQueue(
- is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING);
+ GRPC_CQ_NEXT,
+ is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
+ nullptr);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@@ -256,14 +258,22 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
- sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type));
+ sync_server_cqs->emplace_back(
+ new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
}
}
- std::unique_ptr<Server> server(new Server(
- max_receive_message_size_, &args, sync_server_cqs,
- sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
- sync_server_settings_.cq_timeout_msec, resource_quota_));
+ // == Determine if the server has any callback methods ==
+ bool has_callback_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_callback_methods()) {
+ has_callback_methods = true;
+ break;
+ }
+ }
+
+ // TODO(vjpai): Add a section here for plugins once they can support callback
+ // methods
if (has_sync_methods) {
// This is a Sync server
@@ -275,6 +285,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.cq_timeout_msec);
}
+ if (has_callback_methods) {
+ gpr_log(GPR_INFO, "Callback server.");
+ }
+
+ std::unique_ptr<Server> server(new Server(
+ max_receive_message_size_, &args, sync_server_cqs,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec, resource_quota_,
+ std::move(interceptor_creators_)));
+
ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e
@@ -288,6 +308,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
num_frequently_polled_cqs++;
}
+ if (has_callback_methods) {
+ auto* cq = server->CallbackCQ();
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ num_frequently_polled_cqs++;
+ }
+
// cqs_ contains the completion queue added by calling the ServerBuilder's
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for