aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/nccl
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-10-24 13:55:04 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-10-24 13:58:59 -0700
commitbf1fad214febef6af5c101d8f953d0109c46dfbb (patch)
treea754614b40ba0632ed20041febb443016deba215 /tensorflow/contrib/nccl
parent73f8b044ea7333b25ef5c9841c1e072e45ad5890 (diff)
Fix NCCL rewrite bug when rerunning sessions (assigned device id is not stable).
Fix collocate_gradients for initial losses. Remove NcclBroadcast gradient test for now. The generated AddN to accumulate the broadcast outputs before passing it to the gradient function is CPU only and cannot be collocated with NcclBroadcast on the GPU. PiperOrigin-RevId: 173306409
Diffstat (limited to 'tensorflow/contrib/nccl')
-rw-r--r--tensorflow/contrib/nccl/kernels/nccl_rewrite.cc9
-rw-r--r--tensorflow/contrib/nccl/python/ops/nccl_ops_test.py10
2 files changed, 11 insertions, 8 deletions
diff --git a/tensorflow/contrib/nccl/kernels/nccl_rewrite.cc b/tensorflow/contrib/nccl/kernels/nccl_rewrite.cc
index 94a77c59da..a4de46a93f 100644
--- a/tensorflow/contrib/nccl/kernels/nccl_rewrite.cc
+++ b/tensorflow/contrib/nccl/kernels/nccl_rewrite.cc
@@ -117,6 +117,7 @@ Status ReplaceBroadcast(Graph* graph, Node* node) {
TF_RETURN_IF_ERROR(GetNodeAttr(node->attrs(), "T", &dtype));
int send_dev = node->assigned_device_name_index();
int num_devices = 0; // Number of distinct devices, incremented below.
+ std::vector<int> recv_index_map; // Map device name index to stable index.
// Map device name index to nodes that take the broadcast as input.
std::vector<std::forward_list<NodeBuilder::NodeOut>> out_nodes_map;
@@ -126,9 +127,11 @@ Status ReplaceBroadcast(Graph* graph, Node* node) {
: edge->dst()->assigned_device_name_index();
if (out_nodes_map.size() <= dst_dev) {
out_nodes_map.resize(dst_dev + 1);
+ recv_index_map.resize(dst_dev + 1);
}
auto it = out_nodes_map.begin() + dst_dev;
if (it->empty()) {
+ recv_index_map[dst_dev] = num_devices;
++num_devices;
}
it->emplace_front(NodeBuilder::NodeOut(edge->dst(), edge->dst_input()));
@@ -211,16 +214,18 @@ Status ReplaceBroadcast(Graph* graph, Node* node) {
if (out_nodes_map[recv_dev].empty()) {
continue;
}
+ int recv_index = recv_index_map[recv_dev];
if (is_fully_defined) {
// If the shape is fully defined, define one const node per device.
- NodeBuilder shape_builder(strings::StrCat(shape_name, recv_dev), "Const");
+ NodeBuilder shape_builder(strings::StrCat(shape_name, recv_index),
+ "Const");
shape_builder.Attr("value", tensor_proto).Attr("dtype", DT_INT32);
TF_RETURN_IF_ERROR(shape_builder.Finalize(graph, &shape_node));
shape_node->set_assigned_device_name_index(recv_dev);
}
Node* recv_node;
TF_RETURN_IF_ERROR(
- make_builder("_NcclBroadcastRecv", strings::StrCat("Recv_", recv_dev))
+ make_builder("_NcclBroadcastRecv", strings::StrCat("Recv_", recv_index))
.Input(shape_node)
.Finalize(graph, &recv_node));
recv_node->set_assigned_device_name_index(recv_dev);
diff --git a/tensorflow/contrib/nccl/python/ops/nccl_ops_test.py b/tensorflow/contrib/nccl/python/ops/nccl_ops_test.py
index 255409303a..0b13e3595e 100644
--- a/tensorflow/contrib/nccl/python/ops/nccl_ops_test.py
+++ b/tensorflow/contrib/nccl/python/ops/nccl_ops_test.py
@@ -117,7 +117,8 @@ class NcclTestCase(test.TestCase):
inputs = [array_ops.placeholder(t.dtype, t.shape) for t in tensors]
reduce_tensors = nccl_reduce(inputs, devices)
losses = _DeviceTensors(tensors, [t.device for t in reduce_tensors])
- grads = gradients.gradients(reduce_tensors, inputs, losses)
+ grads = gradients.gradients(
+ reduce_tensors, inputs, losses, colocate_gradients_with_ops=True)
return [g for g in grads if g is not None]
self._Test(_Gradient, numpy_fn)
@@ -159,7 +160,7 @@ class BroadcastTest(NcclTestCase):
def testBroadcastSingleDevice(self):
# Broadcasts on a single device are removed completely during rewrite.
self._Test(_NcclBroadcast, lambda x, y: x,
- (['/device:GPU:0', '/device:GPU:0']))
+ (['/device:GPU:0', '/device:GPU:0'],))
def testBroadcastToCpuError(self):
# Broadcasts to CPU is not supported.
@@ -167,10 +168,7 @@ class BroadcastTest(NcclTestCase):
errors.NotFoundError,
"No registered '_NcclBroadcastRecv' OpKernel for CPU devices"):
self._Test(_NcclBroadcast, lambda x, y: x,
- (['/device:GPU:0', '/device:CPU:0']))
-
- def testBroadcastGrad(self):
- self._TestGradient(_NcclBroadcast, lambda x, y: x + y)
+ (['/device:GPU:0', '/device:CPU:0'],))
class CombinedTest(NcclTestCase):