diff options
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.cc')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 81 |
1 files changed, 64 insertions, 17 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 02ac56a3fd..3e8606a76f 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -22,8 +22,8 @@ #include <mutex> #include <grpc/support/log.h> - #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc { @@ -48,12 +48,17 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0), + max_active_threads_sofar_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -61,6 +66,8 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } + grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -81,17 +88,27 @@ bool ThreadManager::IsShutdown() { return shutdown_; } +int ThreadManager::GetMaxActiveThreadsSoFar() { + std::lock_guard<std::mutex> list_lock(list_mu_); + return max_active_threads_sofar_; +} + void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { std::lock_guard<std::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); } - std::lock_guard<std::mutex> lock(mu_); - num_threads_--; - if (num_threads_ == 0) { - shutdown_cv_.notify_one(); + { + std::lock_guard<std::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } } + + // Give a thread back to the resource quota + grpc_resource_user_free_threads(resource_user_, 1); } void ThreadManager::CleanupCompletedThreads() { @@ -106,14 +123,22 @@ void ThreadManager::CleanupCompletedThreads() { } void ThreadManager::Initialize() { + if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) { + gpr_log(GPR_ERROR, + "No thread quota available to even create the minimum required " + "polling threads (i.e %d). Unable to start the thread manager", + min_pollers_); + abort(); + } + { std::unique_lock<std::mutex> lock(mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; + max_active_threads_sofar_ = min_pollers_; } for (int i = 0; i < min_pollers_; i++) { - // Create a new thread (which ends up calling the MainWorkLoop() function new WorkerThread(this); } } @@ -139,20 +164,40 @@ void ThreadManager::MainWorkLoop() { done = true; break; case WORK_FOUND: - // If we got work and there are now insufficient pollers, start a new - // one + // If we got work and there are now insufficient pollers and there is + // quota available to create a new thread, start a new poller thread + bool resource_exhausted = false; if (!shutdown_ && num_pollers_ < min_pollers_) { - num_pollers_++; - num_threads_++; - // Drop lock before spawning thread to avoid contention - lock.unlock(); - new WorkerThread(this); + if (grpc_resource_user_allocate_threads(resource_user_, 1)) { + // We can allocate a new poller thread + num_pollers_++; + num_threads_++; + if (num_threads_ > max_active_threads_sofar_) { + max_active_threads_sofar_ = num_threads_; + } + // Drop lock before spawning thread to avoid contention + lock.unlock(); + new WorkerThread(this); + } else if (num_pollers_ > 0) { + // There is still at least some thread polling, so we can go on + // even though we are below the number of pollers that we would + // like to have (min_pollers_) + lock.unlock(); + } else { + // There are no pollers to spare and we couldn't allocate + // a new thread, so resources are exhausted! + lock.unlock(); + resource_exhausted = true; + } } else { - // Drop lock for consistency with above branch + // There are a sufficient number of pollers available so we can do + // the work and continue polling with our existing poller threads lock.unlock(); } // Lock is always released at this point - do the application work - DoWork(tag, ok); + // or return resource exhausted if there is new work but we couldn't + // get a thread in which to do it. + DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. @@ -196,6 +241,8 @@ void ThreadManager::MainWorkLoop() { } }; + // This thread is exiting. Do some cleanup work i.e delete already completed + // worker threads CleanupCompletedThreads(); // If we are here, either ThreadManager is shutting down or it already has |