aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/common_runtime
diff options
context:
space:
mode:
authorGravatar Ayush Dubey <ayushd@google.com>2018-09-27 12:20:33 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-09-27 12:28:44 -0700
commit561a3c4331ebfaac3e61c524911bf6fe85f4ebc9 (patch)
tree6f563c5ca0f579653082d1ae2cc548da10244076 /tensorflow/core/common_runtime
parent9a0a768d4416d157664d864d992a62782beea4a4 (diff)
Dynamic subdivisions in collective ring reduce.
Before this change, a CollectiveOp user was required to specify subdiv_offsets for the RingReduce algorithm. During ring reduction, we created chunks of the tensor to exchange between devices. If the chunks were too large, or if the hardware supported multiple data exchanges in parallel, the user could further subdivide the chunk by specifying more than 1 subdiv offset. Each subdiv offset corresponded to another subdivision of the chunk, so effectively the total number of tensor chunks is number of devices * number of subdivs. After this change, we can dynamically infer the number of subdivisions based on a target chunk size. In ring_reducer.cc, we start with 1 subdiv, and keep increasing until chunk size is less than MAX_CHUNK_SIZE. Currently, MAX_CHUNK_SIZE is set at 4 MB, although it may make sense to change this based on specific hardware. As a part of this change, a user can now provide an empty subdiv_offset list. If empty, we dynamically add subdivisions based on the above algorithm. If non-empty, we take the user-specified subdivions. PiperOrigin-RevId: 214815959
Diffstat (limited to 'tensorflow/core/common_runtime')
-rw-r--r--tensorflow/core/common_runtime/ring_reducer.cc75
-rw-r--r--tensorflow/core/common_runtime/ring_reducer_test.cc83
2 files changed, 137 insertions, 21 deletions
diff --git a/tensorflow/core/common_runtime/ring_reducer.cc b/tensorflow/core/common_runtime/ring_reducer.cc
index a81f8650bf..b1fe928ba7 100644
--- a/tensorflow/core/common_runtime/ring_reducer.cc
+++ b/tensorflow/core/common_runtime/ring_reducer.cc
@@ -41,6 +41,16 @@ limitations under the License.
// Set true for greater intelligibility of debug mode log messages.
#define READABLE_KEYS false
+// RingReduce algorithm exchanges chunks of tensor between devices. The chunk
+// size depends on the number of subdivisions specified in the algorithm. If
+// the user does not specify the number of subdivisions, we infer the number
+// dynamically so that the resulting chunk size does not exceed
+// kMaxChunkSizeBytes, empirically set at 4 MiB.
+constexpr size_t kMaxChunkSizeBytes = (4 * 1024 * 1024);
+// kMaxSubdivsPerDev is used to give an upper bound on the number of
+// subdivisions dynamically generated. A reasonable value would be a small
+// multiple of the number of NICs adjacent to each device.
+constexpr int kMaxSubdivsPerDevice = 2;
namespace tensorflow {
namespace {
@@ -92,7 +102,62 @@ RingReducer::RingReducer()
RingReducer::~RingReducer() { group_size_tensor_ready_.WaitForNotification(); }
+Status GenerateSubdivsInCollectiveParams(CollectiveParams* col_params) {
+ if (col_params->instance.shape.num_elements() == 0) {
+ return errors::Internal("shape in CollectiveParams should be non-empty");
+ }
+ const int kAvgDevPerTask =
+ col_params->group.group_size / col_params->group.num_tasks;
+ const int kMaxNumSubdivs = kMaxSubdivsPerDevice * kAvgDevPerTask;
+ if (kMaxNumSubdivs <= 0) {
+ return errors::Internal("Unexpected kMaxNumSubdivs ", kMaxNumSubdivs,
+ " in RingReducer");
+ }
+ // NOTE(ayushd): If no subdiv_offsets have been specified, dynamically add
+ // as many offsets as needed so that the size of tensor chunks <=
+ // kMaxChunkSizeBytes. Empirically, chunks that are too small or too large
+ // lead to worse performance.
+ int num_subdivs = 0;
+ const size_t tensor_size = col_params->instance.shape.num_elements() *
+ DataTypeSize(col_params->instance.data_type);
+ size_t chunk_size;
+ do {
+ ++num_subdivs;
+ int num_chunks = col_params->group.group_size * num_subdivs;
+ chunk_size = tensor_size / num_chunks;
+ VLOG(2) << "num_subdivs " << num_subdivs << " num_chunks " << num_chunks
+ << " chunk_size " << chunk_size;
+ } while (chunk_size > kMaxChunkSizeBytes && num_subdivs < kMaxNumSubdivs);
+ if (num_subdivs <= 0) {
+ return errors::Internal("Unexpected num_subdivs ", num_subdivs,
+ " in RingReducer");
+ }
+
+ int subdiv_stride = kAvgDevPerTask / num_subdivs;
+ if (subdiv_stride == 0) subdiv_stride = 1;
+ col_params->instance.impl_details.subdiv_offsets.reserve(num_subdivs);
+ for (int sdi = 0; sdi < num_subdivs; ++sdi) {
+ int subdiv_offset = subdiv_stride * sdi;
+ if (sdi % 2 == 1) subdiv_offset *= -1;
+ col_params->instance.impl_details.subdiv_offsets.push_back(subdiv_offset);
+ }
+
+ if (VLOG_IS_ON(2)) {
+ string subdiv_buf;
+ for (const int subdiv_offset :
+ col_params->instance.impl_details.subdiv_offsets) {
+ strings::StrAppend(&subdiv_buf, " ", subdiv_offset);
+ }
+ VLOG(2) << "Dynamically generated " << num_subdivs
+ << " subdiv_offsets:" << subdiv_buf << " tensor_size "
+ << tensor_size << " chunk_size " << chunk_size;
+ }
+
+ return Status::OK();
+}
+
Status RingReducer::InitializeCollectiveParams(CollectiveParams* col_params) {
+ // TODO(b/113171733): change CHECKs to return errors.
CHECK_EQ(col_params->instance.type, REDUCTION_COLLECTIVE);
CHECK_EQ(col_params->instance.impl_details.collective_name, "RingReduce");
const string& device_name =
@@ -123,12 +188,11 @@ Status RingReducer::InitializeCollectiveParams(CollectiveParams* col_params) {
dev_per_task.push_back(dev_count);
CHECK_EQ(col_params->group.num_tasks, dev_per_task.size());
- // Generate a ring permutation for each requested offset.
if (col_params->instance.impl_details.subdiv_offsets.empty()) {
- return errors::Internal(
- "Subdiv offsets should be non-empty for ring reducer, size=",
- col_params->instance.impl_details.subdiv_offsets.size());
+ TF_RETURN_IF_ERROR(GenerateSubdivsInCollectiveParams(col_params));
}
+
+ // Generate a ring permutation for requested offset.
VLOG(2) << "Setting up perms for col_params " << col_params
<< " subdiv_permutations "
<< &col_params->instance.impl_details.subdiv_permutations;
@@ -646,7 +710,8 @@ bool RingReducer::RunAsyncParts() {
case RF_SEND:
--send_pending_count;
break;
- default: {} // Ignore any other actions
+ default: {
+ } // Ignore any other actions
}
}
}
diff --git a/tensorflow/core/common_runtime/ring_reducer_test.cc b/tensorflow/core/common_runtime/ring_reducer_test.cc
index 28df85399e..75aba43572 100644
--- a/tensorflow/core/common_runtime/ring_reducer_test.cc
+++ b/tensorflow/core/common_runtime/ring_reducer_test.cc
@@ -549,37 +549,38 @@ class RingReducerTest : public ::testing::Test {
int32 reduce_counter_ GUARDED_BY(mu_) = 0;
};
-TEST_F(RingReducerTest, InitializeParams) {
- static const int kNumDevsPerTask = 8;
- static const int kNumTasks = 3;
- static const int kNumDevs = kNumDevsPerTask * kNumTasks;
+CollectiveParams SetUpCollectiveParams(const int num_devs_per_task,
+ const int num_tasks) {
CollectiveParams cp;
- std::vector<string> device_names;
- std::vector<string> task_names;
+ const int kNumDevs = num_devs_per_task * num_tasks;
cp.group.group_key = 1;
cp.group.group_size = kNumDevs;
cp.group.device_type = DeviceType("GPU");
- cp.group.num_tasks = kNumTasks;
+ cp.group.num_tasks = num_tasks;
cp.instance.instance_key = 3;
cp.instance.type = REDUCTION_COLLECTIVE;
cp.instance.data_type = DataType(DT_FLOAT);
- cp.instance.shape = TensorShape({5});
+ cp.instance.shape = TensorShape({kNumDevs});
cp.instance.impl_details.collective_name = "RingReduce";
cp.instance.impl_details.subdiv_offsets.push_back(0);
cp.is_source = false;
for (int i = 0; i < kNumDevs; ++i) {
- int task_id = i / kNumDevsPerTask;
- int dev_id = i % kNumDevsPerTask;
+ int task_id = i / num_devs_per_task;
+ int dev_id = i % num_devs_per_task;
string task_name = strings::StrCat("/job:worker/replica:0/task:", task_id);
- task_names.push_back(task_name);
string device_name = strings::StrCat(task_name, "/device:GPU:", dev_id);
- device_names.push_back(device_name);
cp.instance.task_names.push_back(task_name);
cp.instance.device_names.push_back(device_name);
}
+ return cp;
+}
- int test_rank = 0;
- cp.default_rank = test_rank;
+TEST_F(RingReducerTest, InitializeParams) {
+ const int kNumDevsPerTask = 8;
+ const int kNumTasks = 3;
+ CollectiveParams cp = SetUpCollectiveParams(kNumDevsPerTask, kNumTasks);
+
+ cp.default_rank = 0;
cp.instance.impl_details.subdiv_offsets = {0, 4};
RunSubdivPermsTest(&cp,
{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
@@ -588,8 +589,15 @@ TEST_F(RingReducerTest, InitializeParams) {
8, 9, 10, 11, 20, 21, 22, 23, 16, 17, 18, 19}},
{0, 4});
- test_rank = 3;
- cp.default_rank = test_rank;
+ cp.instance.impl_details.subdiv_offsets = {0, -4};
+ RunSubdivPermsTest(&cp,
+ {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
+ 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
+ {3, 2, 1, 0, 7, 6, 5, 4, 11, 10, 9, 8,
+ 15, 14, 13, 12, 19, 18, 17, 16, 23, 22, 21, 20}},
+ {0, 3});
+
+ cp.default_rank = 3;
cp.instance.impl_details.subdiv_offsets = {3, -3};
RunSubdivPermsTest(&cp,
{{3, 4, 5, 6, 7, 0, 1, 2, 11, 12, 13, 14,
@@ -599,6 +607,49 @@ TEST_F(RingReducerTest, InitializeParams) {
{0, 1});
}
+TEST_F(RingReducerTest, AutomaticSubdivs) {
+ const int kNumDevsPerTask = 8;
+ const int kNumTasks = 3;
+ const int kNumDevs = kNumDevsPerTask * kNumTasks;
+ CollectiveParams cp = SetUpCollectiveParams(kNumDevsPerTask, kNumTasks);
+
+ // Test automatic generation of subdiv offsets.
+ cp.default_rank = 0;
+ cp.instance.impl_details.subdiv_offsets.clear();
+ RunSubdivPermsTest(&cp, {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
+ 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}},
+ {0});
+
+ // Set shape so that with 2 subdivs chunk_size is 3 MiB. This should cause 2
+ // offsets, {0, -4}, to be generated.
+ {
+ int num_subdivs = 2;
+ int num_chunks = kNumDevs * num_subdivs;
+ size_t chunk_size = 3 * 1048576; // 3 MB
+ size_t tensor_size = chunk_size * num_chunks;
+ cp.instance.shape =
+ TensorShape({static_cast<int64>(tensor_size / DataTypeSize(DT_FLOAT))});
+ }
+ cp.instance.impl_details.subdiv_offsets.clear();
+ RunSubdivPermsTest(&cp,
+ {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
+ 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
+ {3, 2, 1, 0, 7, 6, 5, 4, 11, 10, 9, 8,
+ 15, 14, 13, 12, 19, 18, 17, 16, 23, 22, 21, 20}},
+ {0, 3});
+}
+
+TEST_F(RingReducerTest, AutomaticSubdivUpperBound) {
+ const int kNumDevsPerTask = 1;
+ const int kNumTasks = 4;
+ CollectiveParams cp = SetUpCollectiveParams(kNumDevsPerTask, kNumTasks);
+
+ cp.default_rank = 0;
+ cp.instance.impl_details.subdiv_offsets.clear();
+ cp.instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
+ RunSubdivPermsTest(&cp, {{0, 1, 2, 3}, {0, 1, 2, 3}}, {0, 0});
+}
+
// TODO(b/113171733): change to use TEST_P.
#define DEF_TEST(B, T, W, D, S, L, A) \
TEST_F(RingReducerTest, \