diff options
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 190 |
1 files changed, 167 insertions, 23 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index a436ee43e9..28b874d9fb 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -275,15 +275,99 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::SyncRequestManager : public GrpcRpcManager { + public: + SyncRequestManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers) + : GrpcRpcManager(min_pollers, max_pollers), + server_(server), + server_cq_(server_cq), + global_callbacks_(global_callbacks) {} + + static const int kRpcPollingTimeoutMsec = 500; + + WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { + *tag = nullptr; + gpr_timespec deadline = + gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN); + + switch (server_cq_->AsyncNext(tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + return TIMEOUT; + case CompletionQueue::SHUTDOWN: + return SHUTDOWN; + case CompletionQueue::GOT_EVENT: + return WORK_FOUND; + } + + GPR_UNREACHABLE_CODE(return TIMEOUT); + } + + void DoWork(void* tag, bool ok) GRPC_OVERRIDE { + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + if (ok && sync_req) { + SyncRequest::CallData cd(server_, sync_req); + { + sync_req->SetupRequest(); + if (!IsShutdown()) { + sync_req->Request(server_->c_server(), server_cq_->cq()); + } else { + sync_req->TeardownRequest(); + } + } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + + // TODO (sreek): If ok == false, log an error + } + + void AddSyncMethod(RpcServiceMethod* method, void* tag) { + sync_methods_.emplace_back(method, tag); + } + + void AddUnknownSyncMethod() { + // TODO (sreek) - Check if !sync_methods_.empty() is really needed here + if (!sync_methods_.empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + } + + void Start() { + if (!sync_methods_.empty()) { + for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { + m->SetupRequest(); + m->Request(server_->c_server(), server_cq_->cq()); + } + + GrpcRpcManager::Initialize(); + } + } + + private: + Server* server_; + CompletionQueue* server_cq_; + std::vector<SyncRequest> sync_methods_; + std::unique_ptr<RpcServiceMethod> unknown_method_; + std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; +}; + static internal::GrpcLibraryInitializer g_gli_initializer; -Server::Server(bool has_sync_methods, int max_message_size, - ChannelArguments* args) - : GrpcRpcManager(3, 5, 8), - max_message_size_(max_message_size), +Server::Server( + std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs, + int max_message_size, ChannelArguments* args, int min_pollers, + int max_pollers) + : max_message_size_(max_message_size), + sync_server_cqs_(sync_server_cqs), started_(false), shutdown_(false), shutdown_notified_(false), - sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)) { @@ -291,16 +375,17 @@ Server::Server(bool has_sync_methods, int max_message_size, gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; global_callbacks_->UpdateArguments(args); + + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + sync_req_mgrs_.emplace_back(new SyncRequestManager( + this, &(*it), global_callbacks_, min_pollers, max_pollers)); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); - server_ = grpc_server_create(&channel_args, nullptr); - if (!has_sync_methods) { - grpc_server_register_non_listening_completion_queue(server_, cq_.cq(), - nullptr); - } else { - grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); - } + server_ = grpc_server_create(&channel_args, nullptr); } Server::~Server() { @@ -310,15 +395,20 @@ Server::~Server() { lock.unlock(); Shutdown(); } else if (!started_) { + // TODO (sreek): Shutdown all cqs + /* cq_.Shutdown(); + */ } } + // TODO(sreek) Do thisfor all cqs ? + /* void* got_tag; bool ok; GPR_ASSERT(!cq_.Next(&got_tag, &ok)); + */ grpc_server_destroy(server_); - delete sync_methods_; } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -329,8 +419,6 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { grpc_server* Server::c_server() { return server_; } -CompletionQueue* Server::completion_queue() { return &cq_; } - static grpc_server_register_method_payload_handling PayloadHandlingForMethod( RpcServiceMethod* method) { switch (method->method_type()) { @@ -351,6 +439,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { "Can only register an asynchronous service against one server."); service->server_ = this; } + const char* method_name = nullptr; for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { @@ -369,7 +458,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { if (method->handler() == nullptr) { method->set_server_tag(tag); } else { - sync_methods_->emplace_back(method, tag); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddSyncMethod(method, tag); + } } method_name = method->name(); } @@ -405,13 +496,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); if (!has_generic_service_) { - if (!sync_methods_->empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted - // here by gcc-4.4 because it can't match the anonymous nullptr with a - // proper constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddUnknownSyncMethod(); } for (size_t i = 0; i < num_cqs; i++) { @@ -421,6 +507,12 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Start(); + } + + /* TODO (Sreek) - Do this for all cqs */ + /* // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { @@ -430,26 +522,73 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GrpcRpcManager::Initialize(); } + */ return true; } +// TODO (sreek) - Reimplement this void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; + + int shutdown_tag = 0; // Dummy shutdown tag + grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag); + + // Shutdown all RpcManagers. This will try to gracefully stop all the + // threads in the RpcManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownRpcManager(); + } + + shutdown_cq_.Shutdown(); + + void* tag; + bool ok; + CompletionQueue::NextStatus status = + shutdown_cq_.AsyncNext(&tag, &ok, deadline); + + // If this timed out, it means we are done with the grace-period for + // a clean shutdown. We should force a shutdown now by cancelling all + // inflight calls + if (status == CompletionQueue::NextStatus::TIMEOUT) { + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown + + // Wait for threads in all RpcManagers to terminate + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Wait(); + } + + // Shutdown the completion queues + // TODO (sreek) Move this into SyncRequestManager + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + (*it).Shutdown(); + } + + /* grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); lock.unlock(); + */ + // TODO (sreek) Delete this + /* GrpcRpcManager::ShutdownRpcManager(); GrpcRpcManager::Wait(); + */ // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. // Since nothing else needs to touch state guarded by mu_, holding it // through this loop is fine. + // + /* SyncRequest* request; bool ok; while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { @@ -461,6 +600,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } } lock.lock(); + */ /* TODO (sreek) - Remove this block */ // Wait for running callbacks to finish. @@ -642,6 +782,8 @@ void Server::RunRpc() { */ } +/* TODO (sreek) Move this to SyncRequestManager */ +/* void Server::PollForWork(bool& is_work_found, void** tag) { is_work_found = true; *tag = nullptr; @@ -651,6 +793,7 @@ void Server::PollForWork(bool& is_work_found, void** tag) { } } + void Server::DoWork(void* tag) { auto* mrd = static_cast<SyncRequest*>(tag); if (mrd) { @@ -669,6 +812,7 @@ void Server::DoWork(void* tag) { cd.Run(global_callbacks_); } } +*/ ServerInitializer* Server::initializer() { return server_initializer_.get(); } |