aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-07-19 11:00:39 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-07-19 11:00:39 -0700
commitbb5519f5a52aeb23d32ec6ca817e008a65fdfa30 (patch)
tree6767ab57626761f879c68d8fe1cbda92291bc01b
parent0ba41907a25bd2433a433b82269817ea9ab8ec2d (diff)
More changes
-rw-r--r--include/grpc++/server.h10
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.cc19
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.h3
-rw-r--r--src/cpp/server/server.cc93
4 files changed, 87 insertions, 38 deletions
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 6876961e21..03c9778468 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -50,6 +50,8 @@
#include <grpc++/support/status.h>
#include <grpc/compression.h>
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
struct grpc_server;
namespace grpc {
@@ -64,7 +66,9 @@ class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
-class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
+class Server GRPC_FINAL : public ServerInterface,
+ private GrpcLibraryCodegen,
+ public GrpcRpcManager {
public:
~Server();
@@ -99,6 +103,10 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
// Returns a \em raw pointer to the underlying CompletionQueue.
CompletionQueue* completion_queue();
+ /// GRPC RPC Manager functions
+ void PollForWork(bool& is_work_found, void** tag) GRPC_OVERRIDE;
+ void DoWork(void* tag) GRPC_OVERRIDE;
+
private:
friend class AsyncGenericService;
friend class ServerBuilder;
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
index 1c7d5adeaf..7cffb23858 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.cc
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -65,24 +65,20 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers,
GrpcRpcManager::~GrpcRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
-
- shutdown_ = true;
- while (num_threads_ != 0) {
- shutdown_cv_.wait(lock);
- }
+ // ShutdownRpcManager() and Wait() must be called before destroying the object
+ GPR_ASSERT(shutdown_);
+ GPR_ASSERT(num_threads_ == 0);
CleanupCompletedThreads();
}
-// For testing only
void GrpcRpcManager::Wait() {
std::unique_lock<grpc::mutex> lock(mu_);
- while (!shutdown_) {
+ while (num_threads_ != 0) {
shutdown_cv_.wait(lock);
}
}
-// For testing only
void GrpcRpcManager::ShutdownRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true;
@@ -120,7 +116,8 @@ bool GrpcRpcManager::MaybeContinueAsPoller() {
void GrpcRpcManager::MaybeCreatePoller() {
grpc::unique_lock<grpc::mutex> lock(mu_);
- if (num_pollers_ < min_pollers_ && num_threads_ < max_threads_) {
+ if (!shutdown_ && num_pollers_ < min_pollers_ &&
+ num_threads_ < max_threads_) {
num_pollers_++;
num_threads_++;
@@ -131,7 +128,7 @@ void GrpcRpcManager::MaybeCreatePoller() {
void GrpcRpcManager::MainWorkLoop() {
bool is_work_found = false;
- void *tag;
+ void* tag;
do {
PollForWork(is_work_found, &tag);
@@ -159,7 +156,7 @@ void GrpcRpcManager::MainWorkLoop() {
grpc::unique_lock<grpc::mutex> lock(mu_);
num_threads_--;
if (num_threads_ == 0) {
- shutdown_cv_.notify_all();
+ shutdown_cv_.notify_one();
}
}
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
index a8cc6eb80f..475ce97995 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.h
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -53,7 +53,6 @@ class GrpcRpcManager {
virtual void PollForWork(bool& is_work_found, void **tag) = 0;
virtual void DoWork(void *tag) = 0;
- // Use the following two functions for testing purposes only
void Wait();
void ShutdownRpcManager();
@@ -64,6 +63,8 @@ class GrpcRpcManager {
// The Run() function calls GrpcManager::MainWorkLoop() function and once that
// completes, it marks the GrpcRpcManagerThread completed by calling
// GrpcRpcManager::MarkAsCompleted()
+ // TODO: sreek - Consider using a separate threadpool rather than implementing
+ // one in this class
class GrpcRpcManagerThread {
public:
GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index af04fd4ca6..732c20b2d2 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -278,7 +278,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size, ChannelArguments* args)
- : max_message_size_(max_message_size),
+ : GrpcRpcManager(3, 5, 8),
+ max_message_size_(max_message_size),
started_(false),
shutdown_(false),
shutdown_notified_(false),
@@ -314,6 +315,7 @@ Server::~Server() {
cq_.Shutdown();
}
}
+
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
@@ -429,7 +431,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
m->Request(server_, cq_.cq());
}
- ScheduleCallback();
+ GrpcRpcManager::Initialize();
+ // ScheduleCallback();
}
return true;
@@ -442,6 +445,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+
+ GrpcRpcManager::ShutdownRpcManager();
+ GrpcRpcManager::Wait();
+
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
@@ -587,44 +594,80 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
+// TODO: sreek - Remove this function
void Server::ScheduleCallback() {
+ GPR_ASSERT(false);
+ /*
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->Add(std::bind(&Server::RunRpc, this));
+ */
}
+// TODO: sreek - Remove this function
void Server::RunRpc() {
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
+ GPR_ASSERT(false);
+ /*
+ // Wait for one more incoming rpc.
+ bool ok;
+ GPR_TIMER_SCOPE("Server::RunRpc", 0);
+ auto* mrd = SyncRequest::Wait(&cq_, &ok);
+ if (mrd) {
+ ScheduleCallback();
+ if (ok) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+ }
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_running_cb_--;
+ if (shutdown_) {
+ callback_cv_.notify_all();
}
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
}
+ */
+}
+
+void Server::PollForWork(bool& is_work_found, void** tag) {
+ is_work_found = true;
+ *tag = nullptr;
+ auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
+ if (is_work_found) {
+ *tag = mrd;
}
+}
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
+void Server::DoWork(void* tag) {
+ auto* mrd = static_cast<SyncRequest*>(tag);
+ if (mrd) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
}
}