aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-11-14 19:04:02 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-01-08 10:02:38 -0800
commit5dd32268be62114e8a7c81d60c0dc2633fb83081 (patch)
tree5d97aa70dfc6ea09df7da9e7955866d7574cb1e3 /src/cpp/thread_manager
parent669900c7de64d5992c92a838e23097b27e09d0b5 (diff)
Switch C++ sync server to use gpr_thd rather than std::thread and provide resource exhaustion mechanism
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r--src/cpp/thread_manager/thread_manager.cc54
-rw-r--r--src/cpp/thread_manager/thread_manager.h28
2 files changed, 62 insertions, 20 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 23264f1b5b..107c60f4eb 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -20,18 +20,26 @@
#include <climits>
#include <mutex>
-#include <thread>
#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
namespace grpc {
-ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
+ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid)
: thd_mgr_(thd_mgr) {
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+
// Make thread creation exclusive with respect to its join happening in
// ~WorkerThread().
std::lock_guard<std::mutex> lock(wt_mu_);
- thd_ = std::thread(&ThreadManager::WorkerThread::Run, this);
+ *valid = valid_ = thd_mgr->thread_creator_(
+ &thd_, "worker thread",
+ [](void* th) {
+ reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run();
+ },
+ this, &opt);
}
void ThreadManager::WorkerThread::Run() {
@@ -42,15 +50,24 @@ void ThreadManager::WorkerThread::Run() {
ThreadManager::WorkerThread::~WorkerThread() {
// Don't join until the thread is fully constructed.
std::lock_guard<std::mutex> lock(wt_mu_);
- thd_.join();
+ if (valid_) {
+ thd_mgr_->thread_joiner_(thd_);
+ }
}
-ThreadManager::ThreadManager(int min_pollers, int max_pollers)
+ThreadManager::ThreadManager(
+ int min_pollers, int max_pollers,
+ std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
+ const gpr_thd_options*)>
+ thread_creator,
+ std::function<void(gpr_thd_id)> thread_joiner)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
- num_threads_(0) {}
+ num_threads_(0),
+ thread_creator_(thread_creator),
+ thread_joiner_(thread_joiner) {}
ThreadManager::~ThreadManager() {
{
@@ -111,7 +128,9 @@ void ThreadManager::Initialize() {
for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function
- new WorkerThread(this);
+ bool valid;
+ new WorkerThread(this, &valid);
+ GPR_ASSERT(valid); // we need to have at least this minimum
}
}
@@ -138,18 +157,27 @@ void ThreadManager::MainWorkLoop() {
case WORK_FOUND:
// If we got work and there are now insufficient pollers, start a new
// one
+ bool resources;
if (!shutdown_ && num_pollers_ < min_pollers_) {
- num_pollers_++;
- num_threads_++;
+ bool valid;
// Drop lock before spawning thread to avoid contention
lock.unlock();
- new WorkerThread(this);
+ auto* th = new WorkerThread(this, &valid);
+ lock.lock();
+ if (valid) {
+ num_pollers_++;
+ num_threads_++;
+ } else {
+ delete th;
+ }
+ resources = (num_pollers_ > 0);
} else {
- // Drop lock for consistency with above branch
- lock.unlock();
+ resources = true;
}
+ // Drop lock before any application work
+ lock.unlock();
// Lock is always released at this point - do the application work
- DoWork(tag, ok);
+ DoWork(tag, ok, resources);
// Take the lock again to check post conditions
lock.lock();
// If we're shutdown, we should finish at this point.
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index a206e0bd8a..4fa8a6c563 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -23,15 +23,19 @@
#include <list>
#include <memory>
#include <mutex>
-#include <thread>
#include <grpc++/support/config.h>
+#include <grpc/support/thd.h>
namespace grpc {
class ThreadManager {
public:
- explicit ThreadManager(int min_pollers, int max_pollers);
+ ThreadManager(int min_pollers, int max_pollers,
+ std::function<int(gpr_thd_id*, const char*, void (*)(void*),
+ void*, const gpr_thd_options*)>
+ thread_creator,
+ std::function<void(gpr_thd_id)> thread_joiner);
virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads
@@ -50,6 +54,8 @@ class ThreadManager {
// - ThreadManager does not interpret the values of 'tag' and 'ok'
// - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
// DoWork()
+ // - ThreadManager will also pass DoWork a bool saying if there are actually
+ // resources to do the work
//
// If the return value is SHUTDOWN:,
// - ThreadManager WILL NOT call DoWork() and terminates the thead
@@ -69,7 +75,7 @@ class ThreadManager {
// The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work
- virtual void DoWork(void* tag, bool ok) = 0;
+ virtual void DoWork(void* tag, bool ok, bool resources) = 0;
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
@@ -84,15 +90,15 @@ class ThreadManager {
virtual void Wait();
private:
- // Helper wrapper class around std::thread. This takes a ThreadManager object
- // and starts a new std::thread to calls the Run() function.
+ // Helper wrapper class around thread. This takes a ThreadManager object
+ // and starts a new thread to calls the Run() function.
//
// The Run() function calls ThreadManager::MainWorkLoop() function and once
// that completes, it marks the WorkerThread completed by calling
// ThreadManager::MarkAsCompleted()
class WorkerThread {
public:
- WorkerThread(ThreadManager* thd_mgr);
+ WorkerThread(ThreadManager* thd_mgr, bool* valid);
~WorkerThread();
private:
@@ -102,7 +108,8 @@ class ThreadManager {
ThreadManager* const thd_mgr_;
std::mutex wt_mu_;
- std::thread thd_;
+ gpr_thd_id thd_;
+ bool valid_;
};
// The main funtion in ThreadManager
@@ -129,6 +136,13 @@ class ThreadManager {
// currently polling i.e num_pollers_)
int num_threads_;
+ // Functions for creating/joining threads. Normally, these should
+ // be gpr_thd_new/gpr_thd_join but they are overridable
+ std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
+ const gpr_thd_options*)>
+ thread_creator_;
+ std::function<void(gpr_thd_id)> thread_joiner_;
+
std::mutex list_mu_;
std::list<WorkerThread*> completed_threads_;
};