diff options
author | Ayush Dubey <ayushd@google.com> | 2018-09-27 12:20:33 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-09-27 12:28:44 -0700 |
commit | 561a3c4331ebfaac3e61c524911bf6fe85f4ebc9 (patch) | |
tree | 6f563c5ca0f579653082d1ae2cc548da10244076 /tensorflow/core/kernels | |
parent | 9a0a768d4416d157664d864d992a62782beea4a4 (diff) |
Dynamic subdivisions in collective ring reduce.
Before this change, a CollectiveOp user was required to specify subdiv_offsets
for the RingReduce algorithm. During ring reduction, we created chunks of the
tensor to exchange between devices. If the chunks were too large, or if the
hardware supported multiple data exchanges in parallel, the user could further
subdivide the chunk by specifying more than 1 subdiv offset. Each subdiv
offset corresponded to another subdivision of the chunk, so effectively the
total number of tensor chunks is number of devices * number of subdivs.
After this change, we can dynamically infer the number of subdivisions based on
a target chunk size. In ring_reducer.cc, we start with 1 subdiv, and keep
increasing until chunk size is less than MAX_CHUNK_SIZE. Currently,
MAX_CHUNK_SIZE is set at 4 MB, although it may make sense to change this based
on specific hardware.
As a part of this change, a user can now provide an empty subdiv_offset list.
If empty, we dynamically add subdivisions based on the above algorithm. If
non-empty, we take the user-specified subdivions.
PiperOrigin-RevId: 214815959
Diffstat (limited to 'tensorflow/core/kernels')
-rw-r--r-- | tensorflow/core/kernels/collective_ops.cc | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/tensorflow/core/kernels/collective_ops.cc b/tensorflow/core/kernels/collective_ops.cc index e0da91125b..fa959b5a0e 100644 --- a/tensorflow/core/kernels/collective_ops.cc +++ b/tensorflow/core/kernels/collective_ops.cc @@ -132,6 +132,7 @@ class CollectiveReduceOpKernel : public CollectiveOpKernel { "Failed to get CollectiveExecutor from OpKernelContext for Op ", col_params_.name), done); + col_params_.instance.shape = c->input(0).shape(); // Allocate output on the first pass through this function. This must be // done immediately, while we're still in the executor thread. Otherwise // the memory is not guaranteed to be unused by any concurrently executing @@ -171,7 +172,7 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel { OP_REQUIRES_OK( c, c->GetAttr("instance_key", &col_params_.instance.instance_key)); OP_REQUIRES_OK(c, c->GetAttr("T", &col_params_.instance.data_type)); - OP_REQUIRES_OK(c, c->GetAttr("shape", &shape_)); + OP_REQUIRES_OK(c, c->GetAttr("shape", &col_params_.instance.shape)); col_params_.is_source = true; col_params_.instance.impl_details.subdiv_offsets = {0}; @@ -195,13 +196,14 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel { if (c->mutable_output(0) == nullptr) { // Allocate the output tensor, trying to reuse the input. Tensor* output = nullptr; - OP_REQUIRES_OK_ASYNC( - c, c->forward_input_or_allocate_output({0}, 0, shape_, &output), - done); + OP_REQUIRES_OK_ASYNC(c, + c->forward_input_or_allocate_output( + {0}, 0, col_params_.instance.shape, &output), + done); } if (!CanProceedWithCompute(c, col_exec, done)) return; OP_REQUIRES_ASYNC( - c, shape_.IsSameSize(c->input(0).shape()), + c, col_params_.instance.shape.IsSameSize(c->input(0).shape()), errors::Internal("Declared shape of op ", col_params_.name, " does not match shape of input"), done); @@ -214,8 +216,6 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel { } private: - TensorShape shape_; - TF_DISALLOW_COPY_AND_ASSIGN(CollectiveBcastSendOpKernel); }; @@ -234,7 +234,7 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel { OP_REQUIRES_OK( c, c->GetAttr("instance_key", &col_params_.instance.instance_key)); OP_REQUIRES_OK(c, c->GetAttr("T", &col_params_.instance.data_type)); - OP_REQUIRES_OK(c, c->GetAttr("shape", &shape_)); + OP_REQUIRES_OK(c, c->GetAttr("shape", &col_params_.instance.shape)); col_params_.is_source = false; col_params_.instance.impl_details.subdiv_offsets = {0}; @@ -258,7 +258,8 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel { if (c->mutable_output(0) == nullptr) { // No input, so must allocate output. Tensor* output = nullptr; - OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, shape_, &output), done); + OP_REQUIRES_OK_ASYNC( + c, c->allocate_output(0, col_params_.instance.shape, &output), done); } if (!CanProceedWithCompute(c, col_exec, done)) return; @@ -270,8 +271,6 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel { } private: - TensorShape shape_; - TF_DISALLOW_COPY_AND_ASSIGN(CollectiveBcastRecvOpKernel); }; |