diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-10 17:39:54 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-10 17:39:54 -0800 |
commit | cbd04850884ed6978fa6ae19ec8d04c3773d8ac4 (patch) | |
tree | 50dca5c86839c6a90ba61e281fd33682a5d3b23d /src/cpp | |
parent | 36d18a089e66aff8fc542ec2d98623b56a2e78f1 (diff) |
Simplify server ready for async path
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/server.cc | 58 |
1 files changed, 20 insertions, 38 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 44c5276b54..f5bbfdc6f7 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -54,9 +54,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerC secure_(creds != nullptr) { if (creds) { server_ = - grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); + grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); } else { - server_ = grpc_server_create(cq_.cq(), nullptr); + server_ = grpc_server_create(nullptr, nullptr); } } @@ -80,13 +80,17 @@ Server::~Server() { } bool Server::RegisterService(RpcService *service) { + if (!cq_sync_) { + cq_sync_.reset(new CompletionQueue); + } for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); - if (method_map_.find(method->name()) != method_map_.end()) { + void *tag = grpc_server_register_method(server_, method->name(), nullptr); + if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - method_map_.insert(std::make_pair(method->name(), method)); + methods_.emplace_back(method, tag); } return true; } @@ -106,7 +110,11 @@ bool Server::Start() { grpc_server_start(server_); // Start processing rpcs. - if (thread_pool_) { + if (cq_sync_) { + for (auto& m : methods_) { + m.Request(cq_sync_.get()); + } + ScheduleCallback(); } @@ -126,12 +134,6 @@ void Server::Shutdown() { } } } - - // Shutdown the completion queue. - cq_.Shutdown(); - void *tag = nullptr; - bool ok = false; - GPR_ASSERT(false == cq_.Next(&tag, &ok)); } void Server::ScheduleCallback() { @@ -144,35 +146,15 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. - void *tag = nullptr; - GPR_ASSERT(started_); - grpc_call *c_call = NULL; - grpc_call_details call_details; - grpc_call_details_init(&call_details); - grpc_metadata_array initial_metadata; - grpc_metadata_array_init(&initial_metadata); - CompletionQueue cq; - grpc_call_error err = grpc_server_request_call(server_, &c_call, &call_details, &initial_metadata, cq.cq(), cq.cq(), nullptr); - GPR_ASSERT(err == GRPC_CALL_OK); - bool ok = false; - GPR_ASSERT(cq_.Next(&tag, &ok)); - if (ok) { - ServerContext context; - Call call(c_call, nullptr, &cq); + auto* mrd = MethodRequestData::Wait(cq_sync_.get()); + if (mrd) { + MethodRequestData::CallData cd(mrd); + + mrd->Request(cq_sync_.get()); ScheduleCallback(); - RpcServiceMethod *method = nullptr; - auto iter = method_map_.find(call_details.method); - if (iter != method_map_.end()) { - method = iter->second; - } - // TODO(ctiller): allocate only if necessary - std::unique_ptr<google::protobuf::Message> request(method->AllocateRequestProto()); - std::unique_ptr<google::protobuf::Message> response(method->AllocateResponseProto()); - method->handler()->RunHandler(MethodHandler::HandlerParameter( - &call, &context, request.get(), response.get())); + + cd.Run(); } - grpc_call_details_destroy(&call_details); - grpc_metadata_array_destroy(&initial_metadata); { std::unique_lock<std::mutex> lock(mu_); |