aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-10-03 15:08:48 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-10-03 15:15:49 -0700
commit33382d0f537c5c793b46742089ebeb42d764ac45 (patch)
tree5e91e2a9e1d4170d4e93e77eb3438209a9073a28 /src/cpp
parentacd64db4d97ce7db3aba34105da756576b2d6a7d (diff)
Cleanup server_cc.cc
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.h2
-rw-r--r--src/cpp/server/server_cc.cc212
2 files changed, 12 insertions, 202 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
index 3a94fb791c..77715c52fd 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.h
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -47,7 +47,7 @@ class GrpcRpcManager {
explicit GrpcRpcManager(int min_pollers, int max_pollers);
virtual ~GrpcRpcManager();
- // This function MUST be called before using the object
+ // Initializes and Starts the Rpc Manager threads
void Initialize();
// The return type of PollForWork() function
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 36bc61fdf1..761f76fa12 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -118,15 +118,6 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
-// TODO (sreek) - This might no longer be needed
-class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- bool FinalizeResult(void** tag, bool* status) {
- delete this;
- return false;
- }
-};
-
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
@@ -153,40 +144,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
- // TODO (Sreek) This function is probably no longer needed
- static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
- void* tag = nullptr;
- *ok = false;
- if (!cq->Next(&tag, ok)) {
- return nullptr;
- }
- auto* mrd = static_cast<SyncRequest*>(tag);
- GPR_ASSERT(mrd->in_flight_);
- return mrd;
- }
-
- // TODO (sreek) - This function is probably no longer needed
- static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
- gpr_timespec deadline) {
- void* tag = nullptr;
- *ok = false;
- switch (cq->AsyncNext(&tag, ok, deadline)) {
- case CompletionQueue::TIMEOUT:
- *req = nullptr;
- return true;
- case CompletionQueue::SHUTDOWN:
- *req = nullptr;
- return false;
- case CompletionQueue::GOT_EVENT:
- *req = static_cast<SyncRequest*>(tag);
- GPR_ASSERT((*req)->in_flight_);
- return true;
- }
- GPR_UNREACHABLE_CODE(return false);
- }
-
- // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
- // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -194,8 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
- void ResetRequest() { in_flight_ = false; }
-
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -319,32 +274,29 @@ class Server::SyncRequestManager : public GrpcRpcManager {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
- // No tag. Nothing to work on
- // TODO (sreek) - Log a warning here since this is an unlikely case
+ // No tag. Nothing to work on. This is an unlikley scenario and possibly a
+ // bug in RPC Manager implementation.
+ gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
return;
}
if (ok) {
+ // Calldata takes ownership of the completion queue inside sync_req
SyncRequest::CallData cd(server_, sync_req);
{
- sync_req->SetupRequest();
+ // Prepare for the next request
if (!IsShutdown()) {
+ sync_req->SetupRequest(); // Create new completion queue for sync_req
sync_req->Request(server_->c_server(), server_cq_->cq());
- } else {
- sync_req->TeardownRequest();
}
}
+
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
- } else {
- sync_req->ResetRequest();
- // ok is false. For some reason, the tag was returned but event was not
- // successful. In this case, request again unless we are shutting down
- if (!IsShutdown()) {
- // TODO (sreek) Remove this
- // sync_req->Request(server_->c_server(), server_cq_->cq());
- }
}
+ // TODO (sreek) If ok is false here (which it isn't in case of
+ // grpc_request_registered_call), we should still re-queue the request
+ // object
}
void AddSyncMethod(RpcServiceMethod* method, void* tag) {
@@ -428,8 +380,6 @@ Server::Server(
Server::~Server() {
{
- // TODO (sreek) Check if we can just call Shutdown() even in case where
- // started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
@@ -442,12 +392,6 @@ Server::~Server() {
}
}
- // TODO(sreek) Do thisfor all cqs ?
- /*
- void* got_tag;
- bool ok;
- GPR_ASSERT(!cq_.Next(&got_tag, &ok));
- */
grpc_server_destroy(server_);
}
@@ -551,19 +495,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start();
}
- /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
- /*
- // Start processing rpcs.
- if (!sync_methods_->empty()) {
- for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
- m->SetupRequest();
- m->Request(server_, cq_.cq());
- }
-
- GrpcRpcManager::Initialize();
- }
- */
-
return true;
}
@@ -608,48 +539,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
- // Nothing to be done here
- }
-
- /*
- grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
- cq_.Shutdown();
- lock.unlock();
- */
-
- // TODO (sreek) Delete this
- /*
- GrpcRpcManager::ShutdownRpcManager();
- GrpcRpcManager::Wait();
- */
-
- // Spin, eating requests until the completion queue is completely shutdown.
- // If the deadline expires then cancel anything that's pending and keep
- // spinning forever until the work is actually drained.
- // Since nothing else needs to touch state guarded by mu_, holding it
- // through this loop is fine.
- //
- /*
- SyncRequest* request;
- bool ok;
- while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
- if (request == NULL) { // deadline expired
- grpc_server_cancel_all_calls(server_);
- deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- } else if (ok) {
- SyncRequest::CallData call_data(this, request);
- }
+ // Nothing to be done here. Just ignore ok and tag values
}
- lock.lock();
- */
-
- /* TODO (sreek) - Remove this block */
- // Wait for running callbacks to finish.
- /*
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
- */
shutdown_notified_ = true;
shutdown_cv_.notify_all();
@@ -774,87 +665,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
-// TODO: sreek - Remove this function
-void Server::ScheduleCallback() {
- GPR_ASSERT(false);
- /*
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_++;
- }
- thread_pool_->Add(std::bind(&Server::RunRpc, this));
- */
-}
-
-// TODO: sreek - Remove this function
-void Server::RunRpc() {
- GPR_ASSERT(false);
- /*
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
- }
- }
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
- }
- }
-
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
- }
- }
- */
-}
-
-/* TODO (sreek) Move this to SyncRequestManager */
-/*
-void Server::PollForWork(bool& is_work_found, void** tag) {
- is_work_found = true;
- *tag = nullptr;
- auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
- if (is_work_found) {
- *tag = mrd;
- }
-}
-
-
-void Server::DoWork(void* tag) {
- auto* mrd = static_cast<SyncRequest*>(tag);
- if (mrd) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
- }
- }
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
- }
-}
-*/
-
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc