From 8e37ef50c73d6b3f3ec530a3393fe2cba5ad3a30 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Tue, 10 May 2016 10:55:24 -0800 Subject: tensorflow: finer-grained Shard parallelization Provide finer-grained Shard parallelization for the new non-blocking thread pool. This significantly resembles the parallel for algorithm in eigen executors: we choose a good block size based on amount of work and parallel efficiency, and then use recursive division in halves. Benchmark Time(ns): old new diff CPU(ns): old new diff ========================================================================================== cpu_RandomUniform/1M 647541 301220 -53.48% 9576553 10553619 +10.20% cpu_RandomUniform/2M 1116118 495724 -55.58% 18285896 19635580 +7.38% cpu_RandomUniform/8M 2691384 1671594 -37.89% 67830397 72105713 +6.30% cpu_RandomNormal/1M 2126780 1269039 -40.33% 46887528 53197040 +13.46% cpu_RandomNormal/2M 3529118 2350399 -33.40% 94337705 104481933 +10.75% cpu_RandomNormal/8M 12429704 8984079 -27.72% 383278086 410900286 +7.21% cpu_TruncatedNormal/1M 2513508 1504161 -40.16% 59181937 66096798 +11.68% cpu_TruncatedNormal/2M 4012258 2890855 -27.95% 122164300 129760843 +6.22% cpu_TruncatedNormal/8M 17628696 11159204 -36.70% 465946492 513345503 +10.17% TESTED: - passed opensource_build http://ci.tensorflow.org/view/Internal/job/tensorflow-cl-presubmit-multijob/281/ Change: 121971279 --- tensorflow/core/lib/core/threadpool.cc | 28 +++++++++++++++++++++++++++- tensorflow/core/lib/core/threadpool.h | 7 +++++++ tensorflow/core/util/work_sharder.cc | 4 ++++ tensorflow/core/util/work_sharder.h | 3 +-- tensorflow/core/util/work_sharder_test.cc | 19 ++++++++----------- 5 files changed, 47 insertions(+), 14 deletions(-) diff --git a/tensorflow/core/lib/core/threadpool.cc b/tensorflow/core/lib/core/threadpool.cc index f88579418e..52550d8ae0 100644 --- a/tensorflow/core/lib/core/threadpool.cc +++ b/tensorflow/core/lib/core/threadpool.cc @@ -83,7 +83,24 @@ struct ThreadPool::Impl : Eigen::ThreadPoolTempl { Impl(Env* env, const ThreadOptions& thread_options, const string& name, int num_threads) : Eigen::ThreadPoolTempl( - num_threads, EigenEnvironment(env, thread_options, name)) {} + num_threads, EigenEnvironment(env, thread_options, name)), + num_threads_(num_threads) {} + + void ParallelFor(int64 total, int64 cost_per_unit, + std::function fn) { +#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL + CHECK_GE(total, 0); + CHECK_EQ(total, (int64)(Eigen::Index)total); + Eigen::ThreadPoolDevice device(this, num_threads_); + device.parallelFor( + total, Eigen::TensorOpCost(0, 0, cost_per_unit), + [&fn](Eigen::Index first, Eigen::Index last) { fn(first, last); }); +#else + CHECK(0); // should not be used with the old thread pool +#endif + } + + const int num_threads_; }; #else @@ -93,6 +110,10 @@ struct ThreadPool::Impl { int num_threads); ~Impl(); void Schedule(std::function fn); + void ParallelFor(int64 total, int64 cost_per_unit, + std::function fn) { + CHECK(0); // should not be used with the old thread pool + } private: struct Waiter { @@ -216,5 +237,10 @@ void ThreadPool::Schedule(std::function fn) { impl_->Schedule(std::move(fn)); } +void ThreadPool::ParallelFor(int64 total, int64 cost_per_unit, + std::function fn) { + impl_->ParallelFor(total, cost_per_unit, std::move(fn)); +} + } // namespace thread } // namespace tensorflow diff --git a/tensorflow/core/lib/core/threadpool.h b/tensorflow/core/lib/core/threadpool.h index ae709e0824..ae902f39cb 100644 --- a/tensorflow/core/lib/core/threadpool.h +++ b/tensorflow/core/lib/core/threadpool.h @@ -47,6 +47,13 @@ class ThreadPool { // Schedule fn() for execution in the pool of threads. void Schedule(std::function fn); + // ParallelFor shards the "total" unit of work assuming each unit of work + // having roughly "cost_per_unit" cost, in cycles. Each unit of work is + // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work + // and the total cost of each shard is roughly the same. + void ParallelFor(int64 total, int64 cost_per_unit, + std::function fn); + struct Impl; private: diff --git a/tensorflow/core/util/work_sharder.cc b/tensorflow/core/util/work_sharder.cc index 046d69a939..38346d1716 100644 --- a/tensorflow/core/util/work_sharder.cc +++ b/tensorflow/core/util/work_sharder.cc @@ -22,6 +22,9 @@ namespace tensorflow { void Shard(int num_workers, thread::ThreadPool* workers, int64 total, int64 cost_per_unit, std::function work) { +#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL + workers->ParallelFor(total, cost_per_unit, work); +#else CHECK_GE(total, 0); if (total == 0) { return; @@ -68,6 +71,7 @@ void Shard(int num_workers, thread::ThreadPool* workers, int64 total, // Inline execute the 1st shard. work(0, std::min(block_size, total)); counter.Wait(); +#endif } } // end namespace tensorflow diff --git a/tensorflow/core/util/work_sharder.h b/tensorflow/core/util/work_sharder.h index ad21100b00..59c4ac22a0 100644 --- a/tensorflow/core/util/work_sharder.h +++ b/tensorflow/core/util/work_sharder.h @@ -26,8 +26,7 @@ namespace tensorflow { // Shards the "total" unit of work assuming each unit of work having // roughly "cost_per_unit". Each unit of work is indexed 0, 1, ..., // total - 1. Each shard contains 1 or more units of work and the -// total cost of each shard is roughly the same. The total number of -// shards is no more than num_workers. The calling thread and the +// total cost of each shard is roughly the same. The calling thread and the // "workers" are used to compute each shard (calling work(start, // limit). A common configuration is that "workers" is a thread pool // with "num_workers" threads. diff --git a/tensorflow/core/util/work_sharder_test.cc b/tensorflow/core/util/work_sharder_test.cc index c0d7267da9..c11db2904f 100644 --- a/tensorflow/core/util/work_sharder_test.cc +++ b/tensorflow/core/util/work_sharder_test.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/core/util/work_sharder.h" +#include #include #include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/platform/logging.h" @@ -33,8 +34,10 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) { int64 num_done_work = 0; std::vector work(total, false); Shard(num_workers, &threads, total, cost_per_unit, - [&mu, &num_shards, &num_done_work, &work](int start, int limit) { + [=, &mu, &num_shards, &num_done_work, &work](int64 start, int64 limit) { VLOG(1) << "Shard [" << start << "," << limit << ")"; + EXPECT_GE(start, 0); + EXPECT_LE(limit, total); mutex_lock l(mu); ++num_shards; for (; start < limit; ++start) { @@ -43,7 +46,6 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) { work[start] = true; } }); - EXPECT_LE(num_shards, num_workers + 1); EXPECT_EQ(num_done_work, total); LOG(INFO) << num_workers << " " << total << " " << cost_per_unit << " " << num_shards; @@ -61,20 +63,15 @@ TEST(Shard, Basic) { TEST(Shard, OverflowTest) { thread::ThreadPool threads(Env::Default(), "test", 3); - mutex mu; for (auto workers : {1, 2, 3}) { const int64 total_elements = 1LL << 32; - const int64 cost_per_unit = 10000; - int num_shards = 0; - int64 num_elements = 0; + const int64 cost_per_unit = 10; + std::atomic num_elements(0); Shard(workers, &threads, total_elements, cost_per_unit, - [&mu, &num_shards, &num_elements](int64 start, int64 limit) { - mutex_lock l(mu); - ++num_shards; + [&num_elements](int64 start, int64 limit) { num_elements += limit - start; }); - EXPECT_EQ(num_shards, workers); - EXPECT_EQ(num_elements, total_elements); + EXPECT_EQ(num_elements.load(), total_elements); } } -- cgit v1.2.3