diff options
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.cc')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 53 |
1 files changed, 42 insertions, 11 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 02ac56a3fd..fa9eec5f9b 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -22,8 +22,8 @@ #include <mutex> #include <grpc/support/log.h> - #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc { @@ -48,12 +48,17 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0), + max_active_threads_sofar_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -61,6 +66,8 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } + grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -81,17 +88,27 @@ bool ThreadManager::IsShutdown() { return shutdown_; } +int ThreadManager::GetMaxActiveThreadsSoFar() { + std::lock_guard<std::mutex> list_lock(list_mu_); + return max_active_threads_sofar_; +} + void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { std::lock_guard<std::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); } - std::lock_guard<std::mutex> lock(mu_); - num_threads_--; - if (num_threads_ == 0) { - shutdown_cv_.notify_one(); + { + std::lock_guard<std::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } } + + // Give a thread back to the resource quota + grpc_resource_user_free_threads(resource_user_, 1); } void ThreadManager::CleanupCompletedThreads() { @@ -106,14 +123,22 @@ void ThreadManager::CleanupCompletedThreads() { } void ThreadManager::Initialize() { + if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) { + gpr_log(GPR_ERROR, + "No thread quota available to even create the minimum required " + "polling threads (i.e %d). Unable to start the thread manager", + min_pollers_); + abort(); + } + { std::unique_lock<std::mutex> lock(mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; + max_active_threads_sofar_ = min_pollers_; } for (int i = 0; i < min_pollers_; i++) { - // Create a new thread (which ends up calling the MainWorkLoop() function new WorkerThread(this); } } @@ -139,11 +164,15 @@ void ThreadManager::MainWorkLoop() { done = true; break; case WORK_FOUND: - // If we got work and there are now insufficient pollers, start a new - // one - if (!shutdown_ && num_pollers_ < min_pollers_) { + // If we got work and there are now insufficient pollers and there is + // quota available to create a new thread, start a new poller thread + if (!shutdown_ && num_pollers_ < min_pollers_ && + grpc_resource_user_allocate_threads(resource_user_, 1)) { num_pollers_++; num_threads_++; + if (num_threads_ > max_active_threads_sofar_) { + max_active_threads_sofar_ = num_threads_; + } // Drop lock before spawning thread to avoid contention lock.unlock(); new WorkerThread(this); @@ -196,6 +225,8 @@ void ThreadManager::MainWorkLoop() { } }; + // This thread is exiting. Do some cleanup work i.e delete already completed + // worker threads CleanupCompletedThreads(); // If we are here, either ThreadManager is shutting down or it already has |