aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc40
1 files changed, 34 insertions, 6 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index fafe31e84c..fb4c68ebe4 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -33,6 +33,7 @@
#include <grpc++/server.h>
+#include <sstream>
#include <utility>
#include <grpc++/completion_queue.h>
@@ -41,6 +42,7 @@
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
+#include <grpc++/impl/server_initializer.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server_context.h>
@@ -65,7 +67,7 @@ static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
static void InitGlobalCallbacks() {
- if (g_callbacks == nullptr) {
+ if (!g_callbacks) {
g_callbacks.reset(new DefaultGlobalCallbacks());
}
}
@@ -284,7 +286,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
has_generic_service_(false),
server_(nullptr),
thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {
+ thread_pool_owned_(thread_pool_owned),
+ server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
@@ -292,7 +295,12 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
server_ = grpc_server_create(&channel_args, nullptr);
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
+ if (thread_pool_ == nullptr) {
+ grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
+ nullptr);
+ } else {
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
+ }
}
Server::~Server() {
@@ -316,11 +324,15 @@ Server::~Server() {
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
- GPR_ASSERT(g_callbacks == nullptr);
- GPR_ASSERT(callbacks != nullptr);
+ GPR_ASSERT(!g_callbacks);
+ GPR_ASSERT(callbacks);
g_callbacks.reset(callbacks);
}
+grpc_server* Server::c_server() { return server_; }
+
+CompletionQueue* Server::completion_queue() { return &cq_; }
+
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -341,6 +353,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+ const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
if (it->get() == nullptr) { // Handled by generic service if any.
@@ -360,6 +373,17 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
} else {
sync_methods_->emplace_back(method, tag);
}
+ method_name = method->name();
+ }
+
+ // Parse service name.
+ if (method_name != nullptr) {
+ std::stringstream ss(method_name);
+ grpc::string service_name;
+ if (std::getline(ss, service_name, '/') &&
+ std::getline(ss, service_name, '/')) {
+ services_.push_back(service_name);
+ }
}
return true;
}
@@ -392,7 +416,9 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
}
for (size_t i = 0; i < num_cqs; i++) {
- new UnimplementedAsyncRequest(this, cqs[i]);
+ if (cqs[i]->IsFrequentlyPolled()) {
+ new UnimplementedAsyncRequest(this, cqs[i]);
+ }
}
}
// Start processing rpcs.
@@ -598,4 +624,6 @@ void Server::RunRpc() {
}
}
+ServerInitializer* Server::initializer() { return server_initializer_.get(); }
+
} // namespace grpc