aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2018-07-20 10:28:40 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2018-07-20 15:10:16 -0700
commitec1c112cc17cd1290a901ca606ac916422d3342c (patch)
treef3a12769e02c116b0bcbba288562e6e514e58b0b
parentf2a57cdd75c1f808f7682f4d21ab85509be0c006 (diff)
Each ThreadManager is a resource user
-rw-r--r--include/grpcpp/server.h3
-rw-r--r--src/cpp/server/server_builder.cc2
-rw-r--r--src/cpp/server/server_cc.cc23
-rw-r--r--src/cpp/thread_manager/thread_manager.cc36
-rw-r--r--src/cpp/thread_manager/thread_manager.h19
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc10
6 files changed, 76 insertions, 17 deletions
diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h
index 81c3907f86..189cf8accf 100644
--- a/include/grpcpp/server.h
+++ b/include/grpcpp/server.h
@@ -144,7 +144,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
Server(int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec);
+ grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
+ int sync_cq_timeout_msec);
/// Start the server.
///
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index e0b9b7a62b..0ab3cd0e32 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -261,7 +261,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
std::unique_ptr<Server> server(new Server(
- max_receive_message_size_, &args, sync_server_cqs,
+ max_receive_message_size_, &args, sync_server_cqs, resource_quota_,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec));
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 0d77510e29..6e6e0bfffe 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -266,9 +266,9 @@ class Server::SyncRequestThreadManager : public ThreadManager {
public:
SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- int min_pollers, int max_pollers,
- int cq_timeout_msec)
- : ThreadManager(min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
@@ -376,7 +376,8 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec)
+ grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
+ int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
@@ -392,10 +393,20 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-Default");
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, min_pollers, max_pollers,
- sync_cq_timeout_msec));
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
}
}
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 02ac56a3fd..c0fa98798a 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -48,12 +48,16 @@ ThreadManager::WorkerThread::~WorkerThread() {
thd_.Join();
}
-ThreadManager::ThreadManager(int min_pollers, int max_pollers)
+ThreadManager::ThreadManager(const char* name,
+ grpc_resource_quota* resource_quota,
+ int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
- num_threads_(0) {}
+ num_threads_(0) {
+ resource_user_ = grpc_resource_user_create(resource_quota, name);
+}
ThreadManager::~ThreadManager() {
{
@@ -61,6 +65,7 @@ ThreadManager::~ThreadManager() {
GPR_ASSERT(num_threads_ == 0);
}
+ grpc_resource_user_unref(resource_user_);
CleanupCompletedThreads();
}
@@ -113,9 +118,27 @@ void ThreadManager::Initialize() {
}
for (int i = 0; i < min_pollers_; i++) {
- // Create a new thread (which ends up calling the MainWorkLoop() function
- new WorkerThread(this);
+ if (!CreateNewThread(this)) {
+ gpr_log(GPR_ERROR,
+ "No quota available to create additional threads. Created %d (of "
+ "%d) threads",
+ i, min_pollers_);
+ break;
+ }
+ }
+}
+
+bool ThreadManager::CreateNewThread(ThreadManager* thd_mgr) {
+ if (!grpc_resource_user_alloc_threads(thd_mgr->resource_user_, 1)) {
+ return false;
}
+ // Create a new thread (which ends up calling the MainWorkLoop() function
+ new WorkerThread(thd_mgr);
+ return true;
+}
+
+void ThreadManager::ReleaseThread(ThreadManager* thd_mgr) {
+ grpc_resource_user_free_threads(thd_mgr->resource_user_, 1);
}
void ThreadManager::MainWorkLoop() {
@@ -146,7 +169,7 @@ void ThreadManager::MainWorkLoop() {
num_threads_++;
// Drop lock before spawning thread to avoid contention
lock.unlock();
- new WorkerThread(this);
+ CreateNewThread(this);
} else {
// Drop lock for consistency with above branch
lock.unlock();
@@ -196,7 +219,10 @@ void ThreadManager::MainWorkLoop() {
}
};
+ // This thread is exiting. Do some cleanup work (i.e delete already completed
+ // worker threads and also release 1 thread back to the resource quota)
CleanupCompletedThreads();
+ ReleaseThread(this);
// If we are here, either ThreadManager is shutting down or it already has
// enough threads.
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 5a40f2de47..23bd38ee4f 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -27,12 +27,14 @@
#include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/thd.h"
+#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc {
class ThreadManager {
public:
- explicit ThreadManager(int min_pollers, int max_pollers);
+ explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota,
+ int min_pollers, int max_pollers);
virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads
@@ -111,6 +113,13 @@ class ThreadManager {
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();
+ // Checks the resource quota and if available, creates a thread and returns
+ // true. If quota is not available, returns false (and thread is not created)
+ static bool CreateNewThread(ThreadManager* thd_mgr);
+
+ // Give back a thread to the resource quota
+ static void ReleaseThread(ThreadManager* thd_mgr);
+
// Protects shutdown_, num_pollers_ and num_threads_
// TODO: sreek - Change num_pollers and num_threads_ to atomics
std::mutex mu_;
@@ -118,6 +127,14 @@ class ThreadManager {
bool shutdown_;
std::condition_variable shutdown_cv_;
+ // The resource user object to use when requesting quota to create threads
+ //
+ // Note: The user of this ThreadManager object must create grpc_resource_quota
+ // object (that contains the actual max thread quota) and a grpc_resource_user
+ // object through which quota is requested whenver new threads need to be
+ // created
+ grpc_resource_user* resource_user_;
+
// Number of threads doing polling
int num_pollers_;
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 7a95a9f17d..cf2cf770e6 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -32,8 +32,8 @@
namespace grpc {
class ThreadManagerTest final : public grpc::ThreadManager {
public:
- ThreadManagerTest()
- : ThreadManager(kMinPollers, kMaxPollers),
+ ThreadManagerTest(const char* name, grpc_resource_quota* rq)
+ : ThreadManager(name, rq, kMinPollers, kMaxPollers),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
@@ -115,7 +115,11 @@ int main(int argc, char** argv) {
std::srand(std::time(nullptr));
grpc::testing::InitTest(&argc, &argv, true);
- grpc::ThreadManagerTest test_rpc_manager;
+
+ grpc_resource_quota* rq = grpc_resource_quota_create("Test");
+ grpc::ThreadManagerTest test_rpc_manager("TestThreadManager", rq);
+ grpc_resource_quota_unref(rq);
+
test_rpc_manager.PerformTest();
return 0;