diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-10-03 15:08:48 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-10-03 15:15:49 -0700 |
commit | 33382d0f537c5c793b46742089ebeb42d764ac45 (patch) | |
tree | 5e91e2a9e1d4170d4e93e77eb3438209a9073a28 /src/cpp | |
parent | acd64db4d97ce7db3aba34105da756576b2d6a7d (diff) |
Cleanup server_cc.cc
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/rpcmanager/grpc_rpc_manager.h | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 212 |
2 files changed, 12 insertions, 202 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h index 3a94fb791c..77715c52fd 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.h +++ b/src/cpp/rpcmanager/grpc_rpc_manager.h @@ -47,7 +47,7 @@ class GrpcRpcManager { explicit GrpcRpcManager(int min_pollers, int max_pollers); virtual ~GrpcRpcManager(); - // This function MUST be called before using the object + // Initializes and Starts the Rpc Manager threads void Initialize(); // The return type of PollForWork() function diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 36bc61fdf1..761f76fa12 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -118,15 +118,6 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; -// TODO (sreek) - This might no longer be needed -class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { - public: - bool FinalizeResult(void** tag, bool* status) { - delete this; - return false; - } -}; - class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } @@ -153,40 +144,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } - // TODO (Sreek) This function is probably no longer needed - static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { - void* tag = nullptr; - *ok = false; - if (!cq->Next(&tag, ok)) { - return nullptr; - } - auto* mrd = static_cast<SyncRequest*>(tag); - GPR_ASSERT(mrd->in_flight_); - return mrd; - } - - // TODO (sreek) - This function is probably no longer needed - static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, - gpr_timespec deadline) { - void* tag = nullptr; - *ok = false; - switch (cq->AsyncNext(&tag, ok, deadline)) { - case CompletionQueue::TIMEOUT: - *req = nullptr; - return true; - case CompletionQueue::SHUTDOWN: - *req = nullptr; - return false; - case CompletionQueue::GOT_EVENT: - *req = static_cast<SyncRequest*>(tag); - GPR_ASSERT((*req)->in_flight_); - return true; - } - GPR_UNREACHABLE_CODE(return false); - } - - // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest - // functions void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -194,8 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } - void ResetRequest() { in_flight_ = false; } - void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; @@ -319,32 +274,29 @@ class Server::SyncRequestManager : public GrpcRpcManager { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { - // No tag. Nothing to work on - // TODO (sreek) - Log a warning here since this is an unlikely case + // No tag. Nothing to work on. This is an unlikley scenario and possibly a + // bug in RPC Manager implementation. + gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); return; } if (ok) { + // Calldata takes ownership of the completion queue inside sync_req SyncRequest::CallData cd(server_, sync_req); { - sync_req->SetupRequest(); + // Prepare for the next request if (!IsShutdown()) { + sync_req->SetupRequest(); // Create new completion queue for sync_req sync_req->Request(server_->c_server(), server_cq_->cq()); - } else { - sync_req->TeardownRequest(); } } + GPR_TIMER_SCOPE("cd.Run()", 0); cd.Run(global_callbacks_); - } else { - sync_req->ResetRequest(); - // ok is false. For some reason, the tag was returned but event was not - // successful. In this case, request again unless we are shutting down - if (!IsShutdown()) { - // TODO (sreek) Remove this - // sync_req->Request(server_->c_server(), server_cq_->cq()); - } } + // TODO (sreek) If ok is false here (which it isn't in case of + // grpc_request_registered_call), we should still re-queue the request + // object } void AddSyncMethod(RpcServiceMethod* method, void* tag) { @@ -428,8 +380,6 @@ Server::Server( Server::~Server() { { - // TODO (sreek) Check if we can just call Shutdown() even in case where - // started_ == false. This will make things much simpler grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); @@ -442,12 +392,6 @@ Server::~Server() { } } - // TODO(sreek) Do thisfor all cqs ? - /* - void* got_tag; - bool ok; - GPR_ASSERT(!cq_.Next(&got_tag, &ok)); - */ grpc_server_destroy(server_); } @@ -551,19 +495,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } - /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */ - /* - // Start processing rpcs. - if (!sync_methods_->empty()) { - for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->SetupRequest(); - m->Request(server_, cq_.cq()); - } - - GrpcRpcManager::Initialize(); - } - */ - return true; } @@ -608,48 +539,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Drain the shutdown queue (if the previous call to AsyncNext() timed out // and we didn't remove the tag from the queue yet) while (shutdown_cq.Next(&tag, &ok)) { - // Nothing to be done here - } - - /* - grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); - cq_.Shutdown(); - lock.unlock(); - */ - - // TODO (sreek) Delete this - /* - GrpcRpcManager::ShutdownRpcManager(); - GrpcRpcManager::Wait(); - */ - - // Spin, eating requests until the completion queue is completely shutdown. - // If the deadline expires then cancel anything that's pending and keep - // spinning forever until the work is actually drained. - // Since nothing else needs to touch state guarded by mu_, holding it - // through this loop is fine. - // - /* - SyncRequest* request; - bool ok; - while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { - if (request == NULL) { // deadline expired - grpc_server_cancel_all_calls(server_); - deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - } else if (ok) { - SyncRequest::CallData call_data(this, request); - } + // Nothing to be done here. Just ignore ok and tag values } - lock.lock(); - */ - - /* TODO (sreek) - Remove this block */ - // Wait for running callbacks to finish. - /* - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); - } - */ shutdown_notified_ = true; shutdown_cv_.notify_all(); @@ -774,87 +665,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } -// TODO: sreek - Remove this function -void Server::ScheduleCallback() { - GPR_ASSERT(false); - /* - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_++; - } - thread_pool_->Add(std::bind(&Server::RunRpc, this)); - */ -} - -// TODO: sreek - Remove this function -void Server::RunRpc() { - GPR_ASSERT(false); - /* - // Wait for one more incoming rpc. - bool ok; - GPR_TIMER_SCOPE("Server::RunRpc", 0); - auto* mrd = SyncRequest::Wait(&cq_, &ok); - if (mrd) { - ScheduleCallback(); - if (ok) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock<grpc::mutex> lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); - } - } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); - } - } - - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_--; - if (shutdown_) { - callback_cv_.notify_all(); - } - } - */ -} - -/* TODO (sreek) Move this to SyncRequestManager */ -/* -void Server::PollForWork(bool& is_work_found, void** tag) { - is_work_found = true; - *tag = nullptr; - auto* mrd = SyncRequest::Wait(&cq_, &is_work_found); - if (is_work_found) { - *tag = mrd; - } -} - - -void Server::DoWork(void* tag) { - auto* mrd = static_cast<SyncRequest*>(tag); - if (mrd) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock<grpc::mutex> lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); - } - } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); - } -} -*/ - ServerInitializer* Server::initializer() { return server_initializer_.get(); } } // namespace grpc |