diff options
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.h')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 48 |
1 files changed, 43 insertions, 5 deletions
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 5a40f2de47..01043edb31 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -27,12 +27,14 @@ #include <grpcpp/support/config.h> #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/resource_quota.h" namespace grpc { class ThreadManager { public: - explicit ThreadManager(int min_pollers, int max_pollers); + explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -84,6 +86,11 @@ class ThreadManager { // all the threads have drained all the outstanding work virtual void Wait(); + // Max number of concurrent threads that were ever active in this thread + // manager so far. This is useful for debugging purposes (and in unit tests) + // to check if resource_quota is properly being enforced. + int GetMaxActiveThreadsSoFar(); + private: // Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object // and starts a new grpc_core::Thread to calls the Run() function. @@ -91,6 +98,24 @@ class ThreadManager { // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() + // + // WHY IS THIS NEEDED?: + // When a thread terminates, some other thread *must* call Join() on that + // thread so that the resources are released. Having a WorkerThread wrapper + // will make this easier. Once Run() completes, each thread calls the + // following two functions: + // ThreadManager::CleanupCompletedThreads() + // ThreadManager::MarkAsCompleted() + // + // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's + // completed_threads_ list + // - CleanupCompletedThreads() calls "Join()" on the threads that are already + // in the completed_threads_ list (since a thread cannot call Join() on + // itself, it calls CleanupCompletedThreads() *before* calling + // MarkAsCompleted()) + // + // TODO(sreek): Consider creating the threads 'detached' so that Join() need + // not be called (and the need for this WorkerThread class is eliminated) class WorkerThread { public: WorkerThread(ThreadManager* thd_mgr); @@ -111,13 +136,21 @@ class ThreadManager { void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); - // Protects shutdown_, num_pollers_ and num_threads_ - // TODO: sreek - Change num_pollers and num_threads_ to atomics + // Protects shutdown_, num_pollers_, num_threads_ and + // max_active_threads_sofar_ std::mutex mu_; bool shutdown_; std::condition_variable shutdown_cv_; + // The resource user object to use when requesting quota to create threads + // + // Note: The user of this ThreadManager object must create grpc_resource_quota + // object (that contains the actual max thread quota) and a grpc_resource_user + // object through which quota is requested whenver new threads need to be + // created + grpc_resource_user* resource_user_; + // Number of threads doing polling int num_pollers_; @@ -125,10 +158,15 @@ class ThreadManager { int min_pollers_; int max_pollers_; - // The total number of threads (includes threads includes the threads that are - // currently polling i.e num_pollers_) + // The total number of threads currently active (includes threads includes the + // threads that are currently polling i.e num_pollers_) int num_threads_; + // See GetMaxActiveThreadsSoFar()'s description. + // To be more specific, this variable tracks the max value num_threads_ was + // ever set so far + int max_active_threads_sofar_; + std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |