aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc84
1 files changed, 43 insertions, 41 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 3bf9f3fa0f..0d31140924 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -35,16 +35,19 @@
#include <utility>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/grpc_library.h>
+#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
-#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
+#include <grpc++/server_context.h>
#include <grpc++/support/time.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/server/thread_pool_interface.h"
@@ -275,6 +278,7 @@ static grpc_server* CreateServer(const ChannelArguments& args) {
return grpc_server_create(&channel_args, nullptr);
}
+static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size, const ChannelArguments& args)
: max_message_size_(max_message_size),
@@ -286,6 +290,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
server_(CreateServer(args)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
+ g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
@@ -297,6 +302,8 @@ Server::~Server() {
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
+ } else if (!started_) {
+ cq_.Shutdown();
}
}
void* got_tag;
@@ -315,36 +322,31 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks.reset(callbacks);
}
-bool Server::RegisterService(const grpc::string* host, RpcService* service) {
- for (int i = 0; i < service->GetMethodCount(); ++i) {
- RpcServiceMethod* method = service->GetMethod(i);
+bool Server::RegisterService(const grpc::string* host, Service* service) {
+ bool has_async_methods = service->has_async_methods();
+ if (has_async_methods) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an asynchronous service against one server.");
+ service->server_ = this;
+ }
+ for (auto it = service->methods_.begin(); it != service->methods_.end();
+ ++it) {
+ if (it->get() == nullptr) { // Handled by generic service if any.
+ continue;
+ }
+ RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
- if (!tag) {
+ if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- sync_methods_->emplace_back(method, tag);
- }
- return true;
-}
-
-bool Server::RegisterAsyncService(const grpc::string* host,
- AsynchronousService* service) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an asynchronous service against one server.");
- service->server_ = this;
- service->request_args_ = new void* [service->method_count_];
- for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag = grpc_server_register_method(server_, service->method_names_[i],
- host ? host->c_str() : nullptr);
- if (!tag) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
- service->method_names_[i]);
- return false;
+ if (method->handler() == nullptr) {
+ method->set_server_tag(tag);
+ } else {
+ sync_methods_->emplace_back(method, tag);
}
- service->request_args_[i] = tag;
}
return true;
}
@@ -440,8 +442,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GPR_ASSERT(GRPC_CALL_OK == result);
}
-Server::BaseAsyncRequest::BaseAsyncRequest(
- Server* server, ServerContext* context,
+ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
+ ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server),
@@ -454,9 +456,8 @@ Server::BaseAsyncRequest::BaseAsyncRequest(
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
-Server::BaseAsyncRequest::~BaseAsyncRequest() {}
-
-bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
if (*status) {
for (size_t i = 0; i < initial_metadata_array_.count; i++) {
context_->client_metadata_.insert(
@@ -470,7 +471,7 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
grpc_metadata_array_destroy(&initial_metadata_array_);
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size_);
+ Call call(call_, server_, call_cq_, server_->max_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -483,22 +484,22 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
return true;
}
-Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
- Server* server, ServerContext* context,
+ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
-void Server::RegisteredAsyncRequest::IssueRequest(
+void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
- server_->server_, registered_method, &call_, &context_->deadline_,
+ server_->server(), registered_method, &call_, &context_->deadline_,
&initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
this);
}
-Server::GenericAsyncRequest::GenericAsyncRequest(
- Server* server, GenericServerContext* context,
+ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
+ ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
@@ -506,12 +507,13 @@ Server::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server_, &call_, &call_details_,
+ grpc_server_request_call(server->server(), &call_, &call_details_,
&initial_metadata_array_, call_cq->cq(),
notification_cq->cq(), this);
}
-bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
// TODO(yangg) remove the copy here.
if (*status) {
static_cast<GenericServerContext*>(context_)->method_ =