diff options
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 84 |
1 files changed, 43 insertions, 41 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 3bf9f3fa0f..0d31140924 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -35,16 +35,19 @@ #include <utility> -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> #include <grpc++/completion_queue.h> #include <grpc++/generic/async_generic_service.h> +#include <grpc++/impl/codegen/completion_queue_tag.h> +#include <grpc++/impl/grpc_library.h> +#include <grpc++/impl/method_handler_impl.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> -#include <grpc++/server_context.h> #include <grpc++/security/server_credentials.h> +#include <grpc++/server_context.h> #include <grpc++/support/time.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/profiling/timers.h" #include "src/cpp/server/thread_pool_interface.h" @@ -275,6 +278,7 @@ static grpc_server* CreateServer(const ChannelArguments& args) { return grpc_server_create(&channel_args, nullptr); } +static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, int max_message_size, const ChannelArguments& args) : max_message_size_(max_message_size), @@ -286,6 +290,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, server_(CreateServer(args)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { + g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); @@ -297,6 +302,8 @@ Server::~Server() { if (started_ && !shutdown_) { lock.unlock(); Shutdown(); + } else if (!started_) { + cq_.Shutdown(); } } void* got_tag; @@ -315,36 +322,31 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { g_callbacks.reset(callbacks); } -bool Server::RegisterService(const grpc::string* host, RpcService* service) { - for (int i = 0; i < service->GetMethodCount(); ++i) { - RpcServiceMethod* method = service->GetMethod(i); +bool Server::RegisterService(const grpc::string* host, Service* service) { + bool has_async_methods = service->has_async_methods(); + if (has_async_methods) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an asynchronous service against one server."); + service->server_ = this; + } + for (auto it = service->methods_.begin(); it != service->methods_.end(); + ++it) { + if (it->get() == nullptr) { // Handled by generic service if any. + continue; + } + RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method(server_, method->name(), host ? host->c_str() : nullptr); - if (!tag) { + if (tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - sync_methods_->emplace_back(method, tag); - } - return true; -} - -bool Server::RegisterAsyncService(const grpc::string* host, - AsynchronousService* service) { - GPR_ASSERT(service->server_ == nullptr && - "Can only register an asynchronous service against one server."); - service->server_ = this; - service->request_args_ = new void* [service->method_count_]; - for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = grpc_server_register_method(server_, service->method_names_[i], - host ? host->c_str() : nullptr); - if (!tag) { - gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", - service->method_names_[i]); - return false; + if (method->handler() == nullptr) { + method->set_server_tag(tag); + } else { + sync_methods_->emplace_back(method, tag); } - service->request_args_[i] = tag; } return true; } @@ -440,8 +442,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { GPR_ASSERT(GRPC_CALL_OK == result); } -Server::BaseAsyncRequest::BaseAsyncRequest( - Server* server, ServerContext* context, +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize) : server_(server), @@ -454,9 +456,8 @@ Server::BaseAsyncRequest::BaseAsyncRequest( memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } -Server::BaseAsyncRequest::~BaseAsyncRequest() {} - -bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { if (*status) { for (size_t i = 0; i < initial_metadata_array_.count; i++) { context_->client_metadata_.insert( @@ -470,7 +471,7 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { grpc_metadata_array_destroy(&initial_metadata_array_); context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_message_size_); + Call call(call_, server_, call_cq_, server_->max_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -483,22 +484,22 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { return true; } -Server::RegisteredAsyncRequest::RegisteredAsyncRequest( - Server* server, ServerContext* context, +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} -void Server::RegisteredAsyncRequest::IssueRequest( +void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue* notification_cq) { grpc_server_request_registered_call( - server_->server_, registered_method, &call_, &context_->deadline_, + server_->server(), registered_method, &call_, &context_->deadline_, &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), this); } -Server::GenericAsyncRequest::GenericAsyncRequest( - Server* server, GenericServerContext* context, +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, @@ -506,12 +507,13 @@ Server::GenericAsyncRequest::GenericAsyncRequest( grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); - grpc_server_request_call(server->server_, &call_, &call_details_, + grpc_server_request_call(server->server(), &call_, &call_details_, &initial_metadata_array_, call_cq->cq(), notification_cq->cq(), this); } -bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { // TODO(yangg) remove the copy here. if (*status) { static_cast<GenericServerContext*>(context_)->method_ = |