diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/server.cc | 39 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 46 |
2 files changed, 30 insertions, 55 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 878775bbee..898f68f104 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc++/completion_queue.h> #include <grpc++/generic/async_generic_service.h> +#include <grpc++/impl/method_handler_impl.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> #include <grpc++/server_context.h> @@ -314,36 +315,28 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { g_callbacks = callbacks; } -bool Server::RegisterService(const grpc::string* host, RpcService* service) { - for (int i = 0; i < service->GetMethodCount(); ++i) { - RpcServiceMethod* method = service->GetMethod(i); +bool Server::RegisterService(const grpc::string* host, Service* service) { + bool has_async_methods = service->has_async_methods(); + if (has_async_methods) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an asynchronous service against one server."); + service->server_ = this; + } + for (auto it = service->methods_.begin(); it != service->methods_.end(); + ++it) { + RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method(server_, method->name(), host ? host->c_str() : nullptr); - if (!tag) { + if (tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - sync_methods_->emplace_back(method, tag); - } - return true; -} - -bool Server::RegisterAsyncService(const grpc::string* host, - AsynchronousService* service) { - GPR_ASSERT(service->server_ == nullptr && - "Can only register an asynchronous service against one server."); - service->server_ = this; - service->request_args_ = new void* [service->method_count_]; - for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = grpc_server_register_method(server_, service->method_names_[i], - host ? host->c_str() : nullptr); - if (!tag) { - gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", - service->method_names_[i]); - return false; + if (method->handler() == nullptr) { + method->set_server_tag(tag); + } else { + sync_methods_->emplace_back(method, tag); } - service->request_args_[i] = tag; } return true; } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 26c0724a30..bd7dd76b8d 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -43,7 +43,7 @@ namespace grpc { ServerBuilder::ServerBuilder() - : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) { + : max_message_size_(-1), generic_service_(nullptr) { grpc_compression_options_init(&compression_options_); } @@ -53,24 +53,13 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { return std::unique_ptr<ServerCompletionQueue>(cq); } -void ServerBuilder::RegisterService(SynchronousService* service) { - services_.emplace_back(new NamedService<RpcService>(service->service())); -} - -void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { - async_services_.emplace_back(new NamedService<AsynchronousService>(service)); +void ServerBuilder::RegisterService(Service* service) { + services_.emplace_back(new NamedService(service)); } void ServerBuilder::RegisterService(const grpc::string& addr, - SynchronousService* service) { - services_.emplace_back( - new NamedService<RpcService>(addr, service->service())); -} - -void ServerBuilder::RegisterAsyncService(const grpc::string& addr, - AsynchronousService* service) { - async_services_.emplace_back( - new NamedService<AsynchronousService>(addr, service)); + Service* service) { + services_.emplace_back(new NamedService(addr, service)); } void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { @@ -96,14 +85,14 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr, } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - bool thread_pool_owned = false; - if (!async_services_.empty() && !services_.empty()) { - gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now"); - return nullptr; - } - if (!thread_pool_ && !services_.empty()) { - thread_pool_ = CreateDefaultThreadPool(); - thread_pool_owned = true; + std::unique_ptr<ThreadPoolInterface> thread_pool; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + if (thread_pool == nullptr) { + thread_pool.reset(CreateDefaultThreadPool()); + break; + } + } } ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { @@ -115,7 +104,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, compression_options_.enabled_algorithms_bitset); std::unique_ptr<Server> server( - new Server(thread_pool_, thread_pool_owned, max_message_size_, args)); + new Server(thread_pool.release(), true, max_message_size_, args)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr); @@ -126,13 +115,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } - for (auto service = async_services_.begin(); service != async_services_.end(); - service++) { - if (!server->RegisterAsyncService((*service)->host.get(), - (*service)->service)) { - return nullptr; - } - } if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } |