diff options
Diffstat (limited to 'src/cpp/server/thread_pool.cc')
-rw-r--r-- | src/cpp/server/thread_pool.cc | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc index d3013b806c..e8d0e89ed2 100644 --- a/src/cpp/server/thread_pool.cc +++ b/src/cpp/server/thread_pool.cc @@ -31,48 +31,52 @@ * */ +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> + #include "src/cpp/server/thread_pool.h" namespace grpc { +void ThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && callbacks_.empty()) { + cv_.wait(lock); + } + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } +} + ThreadPool::ThreadPool(int num_threads) : shutdown_(false) { for (int i = 0; i < num_threads; i++) { - threads_.push_back(std::thread([this]() { - for (;;) { - // Wait until work is available or we are shutting down. - auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); }; - std::unique_lock<std::mutex> lock(mu_); - if (!have_work()) { - cv_.wait(lock, have_work); - } - // Drain callbacks before considering shutdown to ensure all work - // gets completed. - if (!callbacks_.empty()) { - auto cb = callbacks_.front(); - callbacks_.pop(); - lock.unlock(); - cb(); - } else if (shutdown_) { - return; - } - } - })); + threads_.push_back(grpc::thread(&ThreadPool::ThreadFunc, this)); } } ThreadPool::~ThreadPool() { { - std::lock_guard<std::mutex> lock(mu_); + grpc::lock_guard<grpc::mutex> lock(mu_); shutdown_ = true; cv_.notify_all(); } - for (auto& t : threads_) { - t.join(); + for (auto t = threads_.begin(); t != threads_.end(); t++) { + t->join(); } } void ThreadPool::ScheduleCallback(const std::function<void()>& callback) { - std::lock_guard<std::mutex> lock(mu_); + grpc::lock_guard<grpc::mutex> lock(mu_); callbacks_.push(callback); cv_.notify_one(); } |