aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager/thread_manager.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.cc')
-rw-r--r--src/cpp/thread_manager/thread_manager.cc53
1 files changed, 11 insertions, 42 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index fa9eec5f9b..02ac56a3fd 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -22,8 +22,8 @@
#include <mutex>
#include <grpc/support/log.h>
+
#include "src/core/lib/gprpp/thd.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc {
@@ -48,17 +48,12 @@ ThreadManager::WorkerThread::~WorkerThread() {
thd_.Join();
}
-ThreadManager::ThreadManager(const char* name,
- grpc_resource_quota* resource_quota,
- int min_pollers, int max_pollers)
+ThreadManager::ThreadManager(int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
- num_threads_(0),
- max_active_threads_sofar_(0) {
- resource_user_ = grpc_resource_user_create(resource_quota, name);
-}
+ num_threads_(0) {}
ThreadManager::~ThreadManager() {
{
@@ -66,8 +61,6 @@ ThreadManager::~ThreadManager() {
GPR_ASSERT(num_threads_ == 0);
}
- grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx
- grpc_resource_user_unref(resource_user_);
CleanupCompletedThreads();
}
@@ -88,27 +81,17 @@ bool ThreadManager::IsShutdown() {
return shutdown_;
}
-int ThreadManager::GetMaxActiveThreadsSoFar() {
- std::lock_guard<std::mutex> list_lock(list_mu_);
- return max_active_threads_sofar_;
-}
-
void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{
std::lock_guard<std::mutex> list_lock(list_mu_);
completed_threads_.push_back(thd);
}
- {
- std::lock_guard<std::mutex> lock(mu_);
- num_threads_--;
- if (num_threads_ == 0) {
- shutdown_cv_.notify_one();
- }
+ std::lock_guard<std::mutex> lock(mu_);
+ num_threads_--;
+ if (num_threads_ == 0) {
+ shutdown_cv_.notify_one();
}
-
- // Give a thread back to the resource quota
- grpc_resource_user_free_threads(resource_user_, 1);
}
void ThreadManager::CleanupCompletedThreads() {
@@ -123,22 +106,14 @@ void ThreadManager::CleanupCompletedThreads() {
}
void ThreadManager::Initialize() {
- if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) {
- gpr_log(GPR_ERROR,
- "No thread quota available to even create the minimum required "
- "polling threads (i.e %d). Unable to start the thread manager",
- min_pollers_);
- abort();
- }
-
{
std::unique_lock<std::mutex> lock(mu_);
num_pollers_ = min_pollers_;
num_threads_ = min_pollers_;
- max_active_threads_sofar_ = min_pollers_;
}
for (int i = 0; i < min_pollers_; i++) {
+ // Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(this);
}
}
@@ -164,15 +139,11 @@ void ThreadManager::MainWorkLoop() {
done = true;
break;
case WORK_FOUND:
- // If we got work and there are now insufficient pollers and there is
- // quota available to create a new thread, start a new poller thread
- if (!shutdown_ && num_pollers_ < min_pollers_ &&
- grpc_resource_user_allocate_threads(resource_user_, 1)) {
+ // If we got work and there are now insufficient pollers, start a new
+ // one
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
- if (num_threads_ > max_active_threads_sofar_) {
- max_active_threads_sofar_ = num_threads_;
- }
// Drop lock before spawning thread to avoid contention
lock.unlock();
new WorkerThread(this);
@@ -225,8 +196,6 @@ void ThreadManager::MainWorkLoop() {
}
};
- // This thread is exiting. Do some cleanup work i.e delete already completed
- // worker threads
CleanupCompletedThreads();
// If we are here, either ThreadManager is shutting down or it already has