diff options
author | Vijay Pai <vpai@google.com> | 2017-11-14 19:04:02 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-01-08 10:02:38 -0800 |
commit | 5dd32268be62114e8a7c81d60c0dc2633fb83081 (patch) | |
tree | 5d97aa70dfc6ea09df7da9e7955866d7574cb1e3 /src/cpp/thread_manager | |
parent | 669900c7de64d5992c92a838e23097b27e09d0b5 (diff) |
Switch C++ sync server to use gpr_thd rather than std::thread and provide resource exhaustion mechanism
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 54 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 28 |
2 files changed, 62 insertions, 20 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 23264f1b5b..107c60f4eb 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -20,18 +20,26 @@ #include <climits> #include <mutex> -#include <thread> #include <grpc/support/log.h> +#include <grpc/support/thd.h> namespace grpc { -ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid) : thd_mgr_(thd_mgr) { + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + // Make thread creation exclusive with respect to its join happening in // ~WorkerThread(). std::lock_guard<std::mutex> lock(wt_mu_); - thd_ = std::thread(&ThreadManager::WorkerThread::Run, this); + *valid = valid_ = thd_mgr->thread_creator_( + &thd_, "worker thread", + [](void* th) { + reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run(); + }, + this, &opt); } void ThreadManager::WorkerThread::Run() { @@ -42,15 +50,24 @@ void ThreadManager::WorkerThread::Run() { ThreadManager::WorkerThread::~WorkerThread() { // Don't join until the thread is fully constructed. std::lock_guard<std::mutex> lock(wt_mu_); - thd_.join(); + if (valid_) { + thd_mgr_->thread_joiner_(thd_); + } } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager( + int min_pollers, int max_pollers, + std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, + const gpr_thd_options*)> + thread_creator, + std::function<void(gpr_thd_id)> thread_joiner) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0), + thread_creator_(thread_creator), + thread_joiner_(thread_joiner) {} ThreadManager::~ThreadManager() { { @@ -111,7 +128,9 @@ void ThreadManager::Initialize() { for (int i = 0; i < min_pollers_; i++) { // Create a new thread (which ends up calling the MainWorkLoop() function - new WorkerThread(this); + bool valid; + new WorkerThread(this, &valid); + GPR_ASSERT(valid); // we need to have at least this minimum } } @@ -138,18 +157,27 @@ void ThreadManager::MainWorkLoop() { case WORK_FOUND: // If we got work and there are now insufficient pollers, start a new // one + bool resources; if (!shutdown_ && num_pollers_ < min_pollers_) { - num_pollers_++; - num_threads_++; + bool valid; // Drop lock before spawning thread to avoid contention lock.unlock(); - new WorkerThread(this); + auto* th = new WorkerThread(this, &valid); + lock.lock(); + if (valid) { + num_pollers_++; + num_threads_++; + } else { + delete th; + } + resources = (num_pollers_ > 0); } else { - // Drop lock for consistency with above branch - lock.unlock(); + resources = true; } + // Drop lock before any application work + lock.unlock(); // Lock is always released at this point - do the application work - DoWork(tag, ok); + DoWork(tag, ok, resources); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index a206e0bd8a..4fa8a6c563 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -23,15 +23,19 @@ #include <list> #include <memory> #include <mutex> -#include <thread> #include <grpc++/support/config.h> +#include <grpc/support/thd.h> namespace grpc { class ThreadManager { public: - explicit ThreadManager(int min_pollers, int max_pollers); + ThreadManager(int min_pollers, int max_pollers, + std::function<int(gpr_thd_id*, const char*, void (*)(void*), + void*, const gpr_thd_options*)> + thread_creator, + std::function<void(gpr_thd_id)> thread_joiner); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -50,6 +54,8 @@ class ThreadManager { // - ThreadManager does not interpret the values of 'tag' and 'ok' // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to // DoWork() + // - ThreadManager will also pass DoWork a bool saying if there are actually + // resources to do the work // // If the return value is SHUTDOWN:, // - ThreadManager WILL NOT call DoWork() and terminates the thead @@ -69,7 +75,7 @@ class ThreadManager { // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok) = 0; + virtual void DoWork(void* tag, bool ok, bool resources) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which @@ -84,15 +90,15 @@ class ThreadManager { virtual void Wait(); private: - // Helper wrapper class around std::thread. This takes a ThreadManager object - // and starts a new std::thread to calls the Run() function. + // Helper wrapper class around thread. This takes a ThreadManager object + // and starts a new thread to calls the Run() function. // // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() class WorkerThread { public: - WorkerThread(ThreadManager* thd_mgr); + WorkerThread(ThreadManager* thd_mgr, bool* valid); ~WorkerThread(); private: @@ -102,7 +108,8 @@ class ThreadManager { ThreadManager* const thd_mgr_; std::mutex wt_mu_; - std::thread thd_; + gpr_thd_id thd_; + bool valid_; }; // The main funtion in ThreadManager @@ -129,6 +136,13 @@ class ThreadManager { // currently polling i.e num_pollers_) int num_threads_; + // Functions for creating/joining threads. Normally, these should + // be gpr_thd_new/gpr_thd_join but they are overridable + std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, + const gpr_thd_options*)> + thread_creator_; + std::function<void(gpr_thd_id)> thread_joiner_; + std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |