diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/server/server_builder.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 23 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 36 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 19 |
4 files changed, 67 insertions, 13 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e0b9b7a62b..0ab3cd0e32 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -261,7 +261,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } std::unique_ptr<Server> server(new Server( - max_receive_message_size_, &args, sync_server_cqs, + max_receive_message_size_, &args, sync_server_cqs, resource_quota_, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec)); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 0d77510e29..6e6e0bfffe 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -266,9 +266,9 @@ class Server::SyncRequestThreadManager : public ThreadManager { public: SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - int min_pollers, int max_pollers, - int cq_timeout_msec) - : ThreadManager(min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -376,7 +376,8 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec) + grpc_resource_quota* server_rq, int min_pollers, int max_pollers, + int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -392,10 +393,20 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-Default"); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } } diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 02ac56a3fd..c0fa98798a 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -48,12 +48,16 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -61,6 +65,7 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -113,9 +118,27 @@ void ThreadManager::Initialize() { } for (int i = 0; i < min_pollers_; i++) { - // Create a new thread (which ends up calling the MainWorkLoop() function - new WorkerThread(this); + if (!CreateNewThread(this)) { + gpr_log(GPR_ERROR, + "No quota available to create additional threads. Created %d (of " + "%d) threads", + i, min_pollers_); + break; + } + } +} + +bool ThreadManager::CreateNewThread(ThreadManager* thd_mgr) { + if (!grpc_resource_user_alloc_threads(thd_mgr->resource_user_, 1)) { + return false; } + // Create a new thread (which ends up calling the MainWorkLoop() function + new WorkerThread(thd_mgr); + return true; +} + +void ThreadManager::ReleaseThread(ThreadManager* thd_mgr) { + grpc_resource_user_free_threads(thd_mgr->resource_user_, 1); } void ThreadManager::MainWorkLoop() { @@ -146,7 +169,7 @@ void ThreadManager::MainWorkLoop() { num_threads_++; // Drop lock before spawning thread to avoid contention lock.unlock(); - new WorkerThread(this); + CreateNewThread(this); } else { // Drop lock for consistency with above branch lock.unlock(); @@ -196,7 +219,10 @@ void ThreadManager::MainWorkLoop() { } }; + // This thread is exiting. Do some cleanup work (i.e delete already completed + // worker threads and also release 1 thread back to the resource quota) CleanupCompletedThreads(); + ReleaseThread(this); // If we are here, either ThreadManager is shutting down or it already has // enough threads. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 5a40f2de47..23bd38ee4f 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -27,12 +27,14 @@ #include <grpcpp/support/config.h> #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/resource_quota.h" namespace grpc { class ThreadManager { public: - explicit ThreadManager(int min_pollers, int max_pollers); + explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -111,6 +113,13 @@ class ThreadManager { void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); + // Checks the resource quota and if available, creates a thread and returns + // true. If quota is not available, returns false (and thread is not created) + static bool CreateNewThread(ThreadManager* thd_mgr); + + // Give back a thread to the resource quota + static void ReleaseThread(ThreadManager* thd_mgr); + // Protects shutdown_, num_pollers_ and num_threads_ // TODO: sreek - Change num_pollers and num_threads_ to atomics std::mutex mu_; @@ -118,6 +127,14 @@ class ThreadManager { bool shutdown_; std::condition_variable shutdown_cv_; + // The resource user object to use when requesting quota to create threads + // + // Note: The user of this ThreadManager object must create grpc_resource_quota + // object (that contains the actual max thread quota) and a grpc_resource_user + // object through which quota is requested whenver new threads need to be + // created + grpc_resource_user* resource_user_; + // Number of threads doing polling int num_pollers_; |