aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/kernels
diff options
context:
space:
mode:
authorGravatar Ayush Dubey <ayushd@google.com>2018-09-27 12:20:33 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-09-27 12:28:44 -0700
commit561a3c4331ebfaac3e61c524911bf6fe85f4ebc9 (patch)
tree6f563c5ca0f579653082d1ae2cc548da10244076 /tensorflow/core/kernels
parent9a0a768d4416d157664d864d992a62782beea4a4 (diff)
Dynamic subdivisions in collective ring reduce.
Before this change, a CollectiveOp user was required to specify subdiv_offsets for the RingReduce algorithm. During ring reduction, we created chunks of the tensor to exchange between devices. If the chunks were too large, or if the hardware supported multiple data exchanges in parallel, the user could further subdivide the chunk by specifying more than 1 subdiv offset. Each subdiv offset corresponded to another subdivision of the chunk, so effectively the total number of tensor chunks is number of devices * number of subdivs. After this change, we can dynamically infer the number of subdivisions based on a target chunk size. In ring_reducer.cc, we start with 1 subdiv, and keep increasing until chunk size is less than MAX_CHUNK_SIZE. Currently, MAX_CHUNK_SIZE is set at 4 MB, although it may make sense to change this based on specific hardware. As a part of this change, a user can now provide an empty subdiv_offset list. If empty, we dynamically add subdivisions based on the above algorithm. If non-empty, we take the user-specified subdivions. PiperOrigin-RevId: 214815959
Diffstat (limited to 'tensorflow/core/kernels')
-rw-r--r--tensorflow/core/kernels/collective_ops.cc21
1 files changed, 10 insertions, 11 deletions
diff --git a/tensorflow/core/kernels/collective_ops.cc b/tensorflow/core/kernels/collective_ops.cc
index e0da91125b..fa959b5a0e 100644
--- a/tensorflow/core/kernels/collective_ops.cc
+++ b/tensorflow/core/kernels/collective_ops.cc
@@ -132,6 +132,7 @@ class CollectiveReduceOpKernel : public CollectiveOpKernel {
"Failed to get CollectiveExecutor from OpKernelContext for Op ",
col_params_.name),
done);
+ col_params_.instance.shape = c->input(0).shape();
// Allocate output on the first pass through this function. This must be
// done immediately, while we're still in the executor thread. Otherwise
// the memory is not guaranteed to be unused by any concurrently executing
@@ -171,7 +172,7 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel {
OP_REQUIRES_OK(
c, c->GetAttr("instance_key", &col_params_.instance.instance_key));
OP_REQUIRES_OK(c, c->GetAttr("T", &col_params_.instance.data_type));
- OP_REQUIRES_OK(c, c->GetAttr("shape", &shape_));
+ OP_REQUIRES_OK(c, c->GetAttr("shape", &col_params_.instance.shape));
col_params_.is_source = true;
col_params_.instance.impl_details.subdiv_offsets = {0};
@@ -195,13 +196,14 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel {
if (c->mutable_output(0) == nullptr) {
// Allocate the output tensor, trying to reuse the input.
Tensor* output = nullptr;
- OP_REQUIRES_OK_ASYNC(
- c, c->forward_input_or_allocate_output({0}, 0, shape_, &output),
- done);
+ OP_REQUIRES_OK_ASYNC(c,
+ c->forward_input_or_allocate_output(
+ {0}, 0, col_params_.instance.shape, &output),
+ done);
}
if (!CanProceedWithCompute(c, col_exec, done)) return;
OP_REQUIRES_ASYNC(
- c, shape_.IsSameSize(c->input(0).shape()),
+ c, col_params_.instance.shape.IsSameSize(c->input(0).shape()),
errors::Internal("Declared shape of op ", col_params_.name,
" does not match shape of input"),
done);
@@ -214,8 +216,6 @@ class CollectiveBcastSendOpKernel : public CollectiveOpKernel {
}
private:
- TensorShape shape_;
-
TF_DISALLOW_COPY_AND_ASSIGN(CollectiveBcastSendOpKernel);
};
@@ -234,7 +234,7 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel {
OP_REQUIRES_OK(
c, c->GetAttr("instance_key", &col_params_.instance.instance_key));
OP_REQUIRES_OK(c, c->GetAttr("T", &col_params_.instance.data_type));
- OP_REQUIRES_OK(c, c->GetAttr("shape", &shape_));
+ OP_REQUIRES_OK(c, c->GetAttr("shape", &col_params_.instance.shape));
col_params_.is_source = false;
col_params_.instance.impl_details.subdiv_offsets = {0};
@@ -258,7 +258,8 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel {
if (c->mutable_output(0) == nullptr) {
// No input, so must allocate output.
Tensor* output = nullptr;
- OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, shape_, &output), done);
+ OP_REQUIRES_OK_ASYNC(
+ c, c->allocate_output(0, col_params_.instance.shape, &output), done);
}
if (!CanProceedWithCompute(c, col_exec, done)) return;
@@ -270,8 +271,6 @@ class CollectiveBcastRecvOpKernel : public CollectiveOpKernel {
}
private:
- TensorShape shape_;
-
TF_DISALLOW_COPY_AND_ASSIGN(CollectiveBcastRecvOpKernel);
};