aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/mpi_collectives/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/mpi_collectives/__init__.py')
-rw-r--r--tensorflow/contrib/mpi_collectives/__init__.py273
1 files changed, 273 insertions, 0 deletions
diff --git a/tensorflow/contrib/mpi_collectives/__init__.py b/tensorflow/contrib/mpi_collectives/__init__.py
new file mode 100644
index 0000000000..b94f7b0a35
--- /dev/null
+++ b/tensorflow/contrib/mpi_collectives/__init__.py
@@ -0,0 +1,273 @@
+# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+# pylint: disable=g-short-docstring-punctuation
+"""## Communicating Between Processes with MPI
+
+TensorFlow natively provides inter-device communication through send and
+receive ops and inter-node communication through Distributed TensorFlow, based
+on the same send and receive abstractions. On HPC clusters where Infiniband or
+other high-speed node interconnects are available, these can end up being
+insufficient for synchronous data-parallel training (without asynchronous
+gradient descent). This module implements a variety of MPI ops which can take
+advantage of hardware-specific MPI libraries for efficient communication.
+
+In order to use this module, TensorFlow must be built with an MPI library,
+which can be provided to the `./configure` script at build time. As a user of
+TensorFlow, you will need to build TensorFlow yourself to select the MPI
+library to use; to do so, follow the [instructions for building TensorFlow from
+source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources).
+
+### Utility Ops
+
+In addition to reductions and gathers, this module provides utility operations
+for detecting the running MPI configuration.
+
+Example:
+
+```python
+from tensorflow.contrib import mpi
+
+# Use `mpi.Session` instead of `tf.Session`
+with mpi.Session() as session:
+ rank = session.run(mpi.rank())
+ print("My MPI Rank:", rank)
+
+ if rank == 0:
+ print("MPI Size:", session.run(mpi.size()))
+```
+
+@@rank
+@@size
+
+### Ring Allreduce and Allgather
+
+When summing or averaging tensors across many processes, communication can
+easily become a bottleneck. A naive implementation will send all the tensor
+values to the same process, perform the reduction, and then broadcast the
+values back to all other processes, effectively creating a synchronous
+parameter server in one process. However, the process responsible for
+performing the reduction will have to receive and send a massive amount of data
+which scales with the number of processes *and* the number of parameters in the
+model.
+
+Instead of centralizing the reduction and having one primary reducer, we can
+implement a distributed allreduce or allgather. A bandwidth-optimal allreduce
+will end up sending 2(N - 1) values for every value in the input tensor,
+and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce
+requires at least (N - 1) sends between the different nodes, and a broadcast of
+the result also requires (N - 1) sends, for a total of 2 (N - 1); these two
+steps cannot be combined in a clever way to reduce the number of required
+sends.) This module implements bandwidth-optimal ring allreduce and ring
+allgather operations using MPI; by choosing a hardware-appropriate MPI
+implementation (such as OpenMPI with CUDA-IPC support), you can train large
+models with synchronous gradient descent with minimal communication overhead.
+
+In addition to the `allreduce` and `allgather` functions, a convenience
+`DistributedOptimizer` wrapper is provided to simplify using these functions
+for reducing model gradients.
+
+Example:
+
+```python
+import tensorflow as tf
+from tensorflow.contrib import mpi_collectives as mpi
+
+# Construct a simple linear regression model to optimize
+W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32)
+B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32)
+inputs = tf.placeholder("Inputs", shape=[None, 20])
+outputs = tf.placeholder("Outputs", shape=[None, 1])
+loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs)
+
+# Training using MPI allreduce with DistributedOptimizer
+optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer())
+train = optimizer.minimize(loss)
+
+# Average loss over all ranks, for printing.
+# Do not pass this to an optimizer!
+avg_loss = mpi.allreduce(loss)
+
+# On different ranks, feed different input data.
+with mpi.Session() as session:
+ rank = session.run(mpi.rank())
+ batch_inputs, batch_outputs = construct_batch_for_rank(rank)
+ feed_dict = {inputs: batch_inputs, outputs: batch_outputs}
+ _, l = session.run([train, avg_loss], feed_dict=feed_dict)
+ print("Average Loss:", l)
+```
+
+[1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms
+for Clusters of Workstations".
+
+@@Session
+@@DistributedOptimizer
+@@allreduce
+@@allgather
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import tensorflow as tf
+
+from tensorflow.contrib.mpi_collectives.mpi_ops import size
+from tensorflow.contrib.mpi_collectives.mpi_ops import rank
+from tensorflow.contrib.mpi_collectives.mpi_ops import local_rank
+from tensorflow.contrib.mpi_collectives.mpi_ops import allgather
+from tensorflow.contrib.mpi_collectives.mpi_ops import _allreduce
+from tensorflow.contrib.mpi_collectives.mpi_ops import init
+
+
+def allreduce(tensor, average=True):
+ """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices.
+
+ Arguments:
+ tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce.
+ The shape of the input must be identical across all ranks.
+ average: If True, computes the average over all ranks.
+ Otherwise, computes the sum over all ranks.
+
+ This function performs a bandwidth-optimal ring allreduce on the input
+ tensor. If the input is an tf.IndexedSlices, the function instead does an
+ allgather on the values and the indices, effectively doing an allreduce on
+ the represented tensor.
+ """
+ if isinstance(tensor, tf.IndexedSlices):
+ # For IndexedSlices, do two allgathers intead of an allreduce.
+ mpi_size = tf.cast(size(), tensor.values.dtype)
+ values = allgather(tensor.values)
+ indices = allgather(tensor.indices)
+
+ # To make this operation into an average, divide all gathered values by
+ # the MPI size.
+ new_values = tf.div(values, mpi_size) if average else values
+ return tf.IndexedSlices(new_values, indices,
+ dense_shape=tensor.dense_shape)
+ else:
+ mpi_size = tf.cast(size(), tensor.dtype)
+ summed_tensor = _allreduce(tensor)
+ new_tensor = (tf.div(summed_tensor, mpi_size)
+ if average else summed_tensor)
+ return new_tensor
+
+
+class DistributedOptimizer(tf.train.Optimizer):
+ """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to
+ average gradient values before applying gradients to model weights."""
+
+ def __init__(self, optimizer, name=None, use_locking=False):
+ """Construct a new DistributedOptimizer, which uses another optimizer
+ under the hood for computing single-process gradient values and
+ applying gradient updates after the gradient values have been averaged
+ across all the MPI ranks.
+
+ Args:
+ optimizer: Optimizer to use for computing gradients and applying updates.
+ name: Optional name prefix for the operations created when applying
+ gradients. Defaults to "Distributed" followed by the provided
+ optimizer type.
+ use_locking: Whether to use locking when updating variables. See
+ Optimizer.__init__ for more info.
+ """
+ if name is None:
+ name = "Distributed{}".format(type(optimizer).__name__)
+
+ self._optimizer = optimizer
+ super(DistributedOptimizer, self).__init__(
+ name=name, use_locking=use_locking)
+
+ def compute_gradients(self, *args, **kwargs):
+ """Compute gradients of all trainable variables.
+
+ See Optimizer.compute_gradients() for more info.
+
+ In DistributedOptimizer, compute_gradients() is overriden to also
+ allreduce the gradients before returning them.
+ """
+ gradients = (super(DistributedOptimizer, self)
+ .compute_gradients(*args, **kwargs))
+ return [(allreduce(gradient), var) for (gradient, var) in gradients]
+
+ def _apply_dense(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._apply_dense(*args, **kwargs)
+
+ def _apply_sparse(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._apply_sparse(*args, **kwargs)
+
+ def _apply_sparse_duplicate_indices(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._apply_sparse_duplicate_indices(*args,
+ **kwargs)
+
+ def _prepare(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._prepare(*args, **kwargs)
+
+ def _create_slots(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._create_slots(*args, **kwargs)
+
+ def _valid_dtypes(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._valid_dtypes(*args, **kwargs)
+
+ def _finish(self, *args, **kwargs):
+ """Calls this same method on the underlying optimizer."""
+ return self._optimizer._finish(*args, **kwargs)
+
+
+class Session(tf.Session):
+ """A class for running TensorFlow operations, with copies of the same graph
+ running distributed across different MPI nodes.
+
+ The primary difference between `tf.Session` and
+ `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that
+ the `Session` options are correct for use with `tf.contrib.mpi`, and
+ initializes MPI immediately upon the start of the session.
+ """
+
+ def __init__(self, target='', graph=None, config=None):
+ """Creates a new TensorFlow MPI session.
+
+ Unlike a normal `tf.Session`, an MPI Session may only use a single GPU,
+ which must be specified in advance before the session is initialized.
+ In addition, it only uses a single graph evaluation thread, and
+ initializes MPI immediately upon starting.
+
+ If no `graph` argument is specified when constructing the session,
+ the default graph will be launched in the session. If you are
+ using more than one graph (created with `tf.Graph()` in the same
+ process, you will have to use different sessions for each graph,
+ but each graph can be used in multiple sessions. In this case, it
+ is often clearer to pass the graph to be launched explicitly to
+ the session constructor.
+
+ Args:
+ target: (Optional.) The execution engine to connect to.
+ graph: (Optional.) The `Graph` to be launched (described above).
+ config: (Optional.) A `ConfigProto` protocol buffer with configuration
+ options for the session.
+ """
+ super(Session, self).__init__(target, graph, config=config)
+
+ # Initialize MPI on the relevant device.
+ # TODO: Move this to library load and eliminate mpi.Session()
+ if graph is None:
+ graph = tf.get_default_graph()
+ with graph.as_default():
+ self.run(init())