diff options
author | 2016-05-10 10:55:24 -0800 | |
---|---|---|
committer | 2016-05-10 12:02:05 -0700 | |
commit | 8e37ef50c73d6b3f3ec530a3393fe2cba5ad3a30 (patch) | |
tree | 40cb51260b94963072dfd3102c2e70d8a5476cc0 | |
parent | aec09b6cb61c63a28e01f6b413499602e224da2f (diff) |
tensorflow: finer-grained Shard parallelization
Provide finer-grained Shard parallelization for the new non-blocking thread pool.
This significantly resembles the parallel for algorithm in eigen executors:
we choose a good block size based on amount of work and parallel efficiency,
and then use recursive division in halves.
Benchmark Time(ns): old new diff CPU(ns): old new diff
==========================================================================================
cpu_RandomUniform/1M 647541 301220 -53.48% 9576553 10553619 +10.20%
cpu_RandomUniform/2M 1116118 495724 -55.58% 18285896 19635580 +7.38%
cpu_RandomUniform/8M 2691384 1671594 -37.89% 67830397 72105713 +6.30%
cpu_RandomNormal/1M 2126780 1269039 -40.33% 46887528 53197040 +13.46%
cpu_RandomNormal/2M 3529118 2350399 -33.40% 94337705 104481933 +10.75%
cpu_RandomNormal/8M 12429704 8984079 -27.72% 383278086 410900286 +7.21%
cpu_TruncatedNormal/1M 2513508 1504161 -40.16% 59181937 66096798 +11.68%
cpu_TruncatedNormal/2M 4012258 2890855 -27.95% 122164300 129760843 +6.22%
cpu_TruncatedNormal/8M 17628696 11159204 -36.70% 465946492 513345503 +10.17%
TESTED:
- passed opensource_build
http://ci.tensorflow.org/view/Internal/job/tensorflow-cl-presubmit-multijob/281/
Change: 121971279
-rw-r--r-- | tensorflow/core/lib/core/threadpool.cc | 28 | ||||
-rw-r--r-- | tensorflow/core/lib/core/threadpool.h | 7 | ||||
-rw-r--r-- | tensorflow/core/util/work_sharder.cc | 4 | ||||
-rw-r--r-- | tensorflow/core/util/work_sharder.h | 3 | ||||
-rw-r--r-- | tensorflow/core/util/work_sharder_test.cc | 19 |
5 files changed, 47 insertions, 14 deletions
diff --git a/tensorflow/core/lib/core/threadpool.cc b/tensorflow/core/lib/core/threadpool.cc index f88579418e..52550d8ae0 100644 --- a/tensorflow/core/lib/core/threadpool.cc +++ b/tensorflow/core/lib/core/threadpool.cc @@ -83,7 +83,24 @@ struct ThreadPool::Impl : Eigen::ThreadPoolTempl<EigenEnvironment> { Impl(Env* env, const ThreadOptions& thread_options, const string& name, int num_threads) : Eigen::ThreadPoolTempl<EigenEnvironment>( - num_threads, EigenEnvironment(env, thread_options, name)) {} + num_threads, EigenEnvironment(env, thread_options, name)), + num_threads_(num_threads) {} + + void ParallelFor(int64 total, int64 cost_per_unit, + std::function<void(int64, int64)> fn) { +#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL + CHECK_GE(total, 0); + CHECK_EQ(total, (int64)(Eigen::Index)total); + Eigen::ThreadPoolDevice device(this, num_threads_); + device.parallelFor( + total, Eigen::TensorOpCost(0, 0, cost_per_unit), + [&fn](Eigen::Index first, Eigen::Index last) { fn(first, last); }); +#else + CHECK(0); // should not be used with the old thread pool +#endif + } + + const int num_threads_; }; #else @@ -93,6 +110,10 @@ struct ThreadPool::Impl { int num_threads); ~Impl(); void Schedule(std::function<void()> fn); + void ParallelFor(int64 total, int64 cost_per_unit, + std::function<void(int64, int64)> fn) { + CHECK(0); // should not be used with the old thread pool + } private: struct Waiter { @@ -216,5 +237,10 @@ void ThreadPool::Schedule(std::function<void()> fn) { impl_->Schedule(std::move(fn)); } +void ThreadPool::ParallelFor(int64 total, int64 cost_per_unit, + std::function<void(int64, int64)> fn) { + impl_->ParallelFor(total, cost_per_unit, std::move(fn)); +} + } // namespace thread } // namespace tensorflow diff --git a/tensorflow/core/lib/core/threadpool.h b/tensorflow/core/lib/core/threadpool.h index ae709e0824..ae902f39cb 100644 --- a/tensorflow/core/lib/core/threadpool.h +++ b/tensorflow/core/lib/core/threadpool.h @@ -47,6 +47,13 @@ class ThreadPool { // Schedule fn() for execution in the pool of threads. void Schedule(std::function<void()> fn); + // ParallelFor shards the "total" unit of work assuming each unit of work + // having roughly "cost_per_unit" cost, in cycles. Each unit of work is + // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work + // and the total cost of each shard is roughly the same. + void ParallelFor(int64 total, int64 cost_per_unit, + std::function<void(int64, int64)> fn); + struct Impl; private: diff --git a/tensorflow/core/util/work_sharder.cc b/tensorflow/core/util/work_sharder.cc index 046d69a939..38346d1716 100644 --- a/tensorflow/core/util/work_sharder.cc +++ b/tensorflow/core/util/work_sharder.cc @@ -22,6 +22,9 @@ namespace tensorflow { void Shard(int num_workers, thread::ThreadPool* workers, int64 total, int64 cost_per_unit, std::function<void(int64, int64)> work) { +#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL + workers->ParallelFor(total, cost_per_unit, work); +#else CHECK_GE(total, 0); if (total == 0) { return; @@ -68,6 +71,7 @@ void Shard(int num_workers, thread::ThreadPool* workers, int64 total, // Inline execute the 1st shard. work(0, std::min(block_size, total)); counter.Wait(); +#endif } } // end namespace tensorflow diff --git a/tensorflow/core/util/work_sharder.h b/tensorflow/core/util/work_sharder.h index ad21100b00..59c4ac22a0 100644 --- a/tensorflow/core/util/work_sharder.h +++ b/tensorflow/core/util/work_sharder.h @@ -26,8 +26,7 @@ namespace tensorflow { // Shards the "total" unit of work assuming each unit of work having // roughly "cost_per_unit". Each unit of work is indexed 0, 1, ..., // total - 1. Each shard contains 1 or more units of work and the -// total cost of each shard is roughly the same. The total number of -// shards is no more than num_workers. The calling thread and the +// total cost of each shard is roughly the same. The calling thread and the // "workers" are used to compute each shard (calling work(start, // limit). A common configuration is that "workers" is a thread pool // with "num_workers" threads. diff --git a/tensorflow/core/util/work_sharder_test.cc b/tensorflow/core/util/work_sharder_test.cc index c0d7267da9..c11db2904f 100644 --- a/tensorflow/core/util/work_sharder_test.cc +++ b/tensorflow/core/util/work_sharder_test.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/core/util/work_sharder.h" +#include <atomic> #include <vector> #include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/platform/logging.h" @@ -33,8 +34,10 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) { int64 num_done_work = 0; std::vector<bool> work(total, false); Shard(num_workers, &threads, total, cost_per_unit, - [&mu, &num_shards, &num_done_work, &work](int start, int limit) { + [=, &mu, &num_shards, &num_done_work, &work](int64 start, int64 limit) { VLOG(1) << "Shard [" << start << "," << limit << ")"; + EXPECT_GE(start, 0); + EXPECT_LE(limit, total); mutex_lock l(mu); ++num_shards; for (; start < limit; ++start) { @@ -43,7 +46,6 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) { work[start] = true; } }); - EXPECT_LE(num_shards, num_workers + 1); EXPECT_EQ(num_done_work, total); LOG(INFO) << num_workers << " " << total << " " << cost_per_unit << " " << num_shards; @@ -61,20 +63,15 @@ TEST(Shard, Basic) { TEST(Shard, OverflowTest) { thread::ThreadPool threads(Env::Default(), "test", 3); - mutex mu; for (auto workers : {1, 2, 3}) { const int64 total_elements = 1LL << 32; - const int64 cost_per_unit = 10000; - int num_shards = 0; - int64 num_elements = 0; + const int64 cost_per_unit = 10; + std::atomic<int64> num_elements(0); Shard(workers, &threads, total_elements, cost_per_unit, - [&mu, &num_shards, &num_elements](int64 start, int64 limit) { - mutex_lock l(mu); - ++num_shards; + [&num_elements](int64 start, int64 limit) { num_elements += limit - start; }); - EXPECT_EQ(num_shards, workers); - EXPECT_EQ(num_elements, total_elements); + EXPECT_EQ(num_elements.load(), total_elements); } } |