aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <nobody@tensorflow.org>2016-05-10 10:55:24 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2016-05-10 12:02:05 -0700
commit8e37ef50c73d6b3f3ec530a3393fe2cba5ad3a30 (patch)
tree40cb51260b94963072dfd3102c2e70d8a5476cc0
parentaec09b6cb61c63a28e01f6b413499602e224da2f (diff)
tensorflow: finer-grained Shard parallelization
Provide finer-grained Shard parallelization for the new non-blocking thread pool. This significantly resembles the parallel for algorithm in eigen executors: we choose a good block size based on amount of work and parallel efficiency, and then use recursive division in halves. Benchmark Time(ns): old new diff CPU(ns): old new diff ========================================================================================== cpu_RandomUniform/1M 647541 301220 -53.48% 9576553 10553619 +10.20% cpu_RandomUniform/2M 1116118 495724 -55.58% 18285896 19635580 +7.38% cpu_RandomUniform/8M 2691384 1671594 -37.89% 67830397 72105713 +6.30% cpu_RandomNormal/1M 2126780 1269039 -40.33% 46887528 53197040 +13.46% cpu_RandomNormal/2M 3529118 2350399 -33.40% 94337705 104481933 +10.75% cpu_RandomNormal/8M 12429704 8984079 -27.72% 383278086 410900286 +7.21% cpu_TruncatedNormal/1M 2513508 1504161 -40.16% 59181937 66096798 +11.68% cpu_TruncatedNormal/2M 4012258 2890855 -27.95% 122164300 129760843 +6.22% cpu_TruncatedNormal/8M 17628696 11159204 -36.70% 465946492 513345503 +10.17% TESTED: - passed opensource_build http://ci.tensorflow.org/view/Internal/job/tensorflow-cl-presubmit-multijob/281/ Change: 121971279
-rw-r--r--tensorflow/core/lib/core/threadpool.cc28
-rw-r--r--tensorflow/core/lib/core/threadpool.h7
-rw-r--r--tensorflow/core/util/work_sharder.cc4
-rw-r--r--tensorflow/core/util/work_sharder.h3
-rw-r--r--tensorflow/core/util/work_sharder_test.cc19
5 files changed, 47 insertions, 14 deletions
diff --git a/tensorflow/core/lib/core/threadpool.cc b/tensorflow/core/lib/core/threadpool.cc
index f88579418e..52550d8ae0 100644
--- a/tensorflow/core/lib/core/threadpool.cc
+++ b/tensorflow/core/lib/core/threadpool.cc
@@ -83,7 +83,24 @@ struct ThreadPool::Impl : Eigen::ThreadPoolTempl<EigenEnvironment> {
Impl(Env* env, const ThreadOptions& thread_options, const string& name,
int num_threads)
: Eigen::ThreadPoolTempl<EigenEnvironment>(
- num_threads, EigenEnvironment(env, thread_options, name)) {}
+ num_threads, EigenEnvironment(env, thread_options, name)),
+ num_threads_(num_threads) {}
+
+ void ParallelFor(int64 total, int64 cost_per_unit,
+ std::function<void(int64, int64)> fn) {
+#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL
+ CHECK_GE(total, 0);
+ CHECK_EQ(total, (int64)(Eigen::Index)total);
+ Eigen::ThreadPoolDevice device(this, num_threads_);
+ device.parallelFor(
+ total, Eigen::TensorOpCost(0, 0, cost_per_unit),
+ [&fn](Eigen::Index first, Eigen::Index last) { fn(first, last); });
+#else
+ CHECK(0); // should not be used with the old thread pool
+#endif
+ }
+
+ const int num_threads_;
};
#else
@@ -93,6 +110,10 @@ struct ThreadPool::Impl {
int num_threads);
~Impl();
void Schedule(std::function<void()> fn);
+ void ParallelFor(int64 total, int64 cost_per_unit,
+ std::function<void(int64, int64)> fn) {
+ CHECK(0); // should not be used with the old thread pool
+ }
private:
struct Waiter {
@@ -216,5 +237,10 @@ void ThreadPool::Schedule(std::function<void()> fn) {
impl_->Schedule(std::move(fn));
}
+void ThreadPool::ParallelFor(int64 total, int64 cost_per_unit,
+ std::function<void(int64, int64)> fn) {
+ impl_->ParallelFor(total, cost_per_unit, std::move(fn));
+}
+
} // namespace thread
} // namespace tensorflow
diff --git a/tensorflow/core/lib/core/threadpool.h b/tensorflow/core/lib/core/threadpool.h
index ae709e0824..ae902f39cb 100644
--- a/tensorflow/core/lib/core/threadpool.h
+++ b/tensorflow/core/lib/core/threadpool.h
@@ -47,6 +47,13 @@ class ThreadPool {
// Schedule fn() for execution in the pool of threads.
void Schedule(std::function<void()> fn);
+ // ParallelFor shards the "total" unit of work assuming each unit of work
+ // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
+ // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
+ // and the total cost of each shard is roughly the same.
+ void ParallelFor(int64 total, int64 cost_per_unit,
+ std::function<void(int64, int64)> fn);
+
struct Impl;
private:
diff --git a/tensorflow/core/util/work_sharder.cc b/tensorflow/core/util/work_sharder.cc
index 046d69a939..38346d1716 100644
--- a/tensorflow/core/util/work_sharder.cc
+++ b/tensorflow/core/util/work_sharder.cc
@@ -22,6 +22,9 @@ namespace tensorflow {
void Shard(int num_workers, thread::ThreadPool* workers, int64 total,
int64 cost_per_unit, std::function<void(int64, int64)> work) {
+#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL
+ workers->ParallelFor(total, cost_per_unit, work);
+#else
CHECK_GE(total, 0);
if (total == 0) {
return;
@@ -68,6 +71,7 @@ void Shard(int num_workers, thread::ThreadPool* workers, int64 total,
// Inline execute the 1st shard.
work(0, std::min(block_size, total));
counter.Wait();
+#endif
}
} // end namespace tensorflow
diff --git a/tensorflow/core/util/work_sharder.h b/tensorflow/core/util/work_sharder.h
index ad21100b00..59c4ac22a0 100644
--- a/tensorflow/core/util/work_sharder.h
+++ b/tensorflow/core/util/work_sharder.h
@@ -26,8 +26,7 @@ namespace tensorflow {
// Shards the "total" unit of work assuming each unit of work having
// roughly "cost_per_unit". Each unit of work is indexed 0, 1, ...,
// total - 1. Each shard contains 1 or more units of work and the
-// total cost of each shard is roughly the same. The total number of
-// shards is no more than num_workers. The calling thread and the
+// total cost of each shard is roughly the same. The calling thread and the
// "workers" are used to compute each shard (calling work(start,
// limit). A common configuration is that "workers" is a thread pool
// with "num_workers" threads.
diff --git a/tensorflow/core/util/work_sharder_test.cc b/tensorflow/core/util/work_sharder_test.cc
index c0d7267da9..c11db2904f 100644
--- a/tensorflow/core/util/work_sharder_test.cc
+++ b/tensorflow/core/util/work_sharder_test.cc
@@ -15,6 +15,7 @@ limitations under the License.
#include "tensorflow/core/util/work_sharder.h"
+#include <atomic>
#include <vector>
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/platform/logging.h"
@@ -33,8 +34,10 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) {
int64 num_done_work = 0;
std::vector<bool> work(total, false);
Shard(num_workers, &threads, total, cost_per_unit,
- [&mu, &num_shards, &num_done_work, &work](int start, int limit) {
+ [=, &mu, &num_shards, &num_done_work, &work](int64 start, int64 limit) {
VLOG(1) << "Shard [" << start << "," << limit << ")";
+ EXPECT_GE(start, 0);
+ EXPECT_LE(limit, total);
mutex_lock l(mu);
++num_shards;
for (; start < limit; ++start) {
@@ -43,7 +46,6 @@ void RunSharding(int64 num_workers, int64 total, int64 cost_per_unit) {
work[start] = true;
}
});
- EXPECT_LE(num_shards, num_workers + 1);
EXPECT_EQ(num_done_work, total);
LOG(INFO) << num_workers << " " << total << " " << cost_per_unit << " "
<< num_shards;
@@ -61,20 +63,15 @@ TEST(Shard, Basic) {
TEST(Shard, OverflowTest) {
thread::ThreadPool threads(Env::Default(), "test", 3);
- mutex mu;
for (auto workers : {1, 2, 3}) {
const int64 total_elements = 1LL << 32;
- const int64 cost_per_unit = 10000;
- int num_shards = 0;
- int64 num_elements = 0;
+ const int64 cost_per_unit = 10;
+ std::atomic<int64> num_elements(0);
Shard(workers, &threads, total_elements, cost_per_unit,
- [&mu, &num_shards, &num_elements](int64 start, int64 limit) {
- mutex_lock l(mu);
- ++num_shards;
+ [&num_elements](int64 start, int64 limit) {
num_elements += limit - start;
});
- EXPECT_EQ(num_shards, workers);
- EXPECT_EQ(num_elements, total_elements);
+ EXPECT_EQ(num_elements.load(), total_elements);
}
}