diff options
Diffstat (limited to 'tensorflow/contrib/mpi_collectives/__init__.py')
-rw-r--r-- | tensorflow/contrib/mpi_collectives/__init__.py | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/tensorflow/contrib/mpi_collectives/__init__.py b/tensorflow/contrib/mpi_collectives/__init__.py new file mode 100644 index 0000000000..b94f7b0a35 --- /dev/null +++ b/tensorflow/contrib/mpi_collectives/__init__.py @@ -0,0 +1,273 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +# pylint: disable=g-short-docstring-punctuation +"""## Communicating Between Processes with MPI + +TensorFlow natively provides inter-device communication through send and +receive ops and inter-node communication through Distributed TensorFlow, based +on the same send and receive abstractions. On HPC clusters where Infiniband or +other high-speed node interconnects are available, these can end up being +insufficient for synchronous data-parallel training (without asynchronous +gradient descent). This module implements a variety of MPI ops which can take +advantage of hardware-specific MPI libraries for efficient communication. + +In order to use this module, TensorFlow must be built with an MPI library, +which can be provided to the `./configure` script at build time. As a user of +TensorFlow, you will need to build TensorFlow yourself to select the MPI +library to use; to do so, follow the [instructions for building TensorFlow from +source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources). + +### Utility Ops + +In addition to reductions and gathers, this module provides utility operations +for detecting the running MPI configuration. + +Example: + +```python +from tensorflow.contrib import mpi + +# Use `mpi.Session` instead of `tf.Session` +with mpi.Session() as session: + rank = session.run(mpi.rank()) + print("My MPI Rank:", rank) + + if rank == 0: + print("MPI Size:", session.run(mpi.size())) +``` + +@@rank +@@size + +### Ring Allreduce and Allgather + +When summing or averaging tensors across many processes, communication can +easily become a bottleneck. A naive implementation will send all the tensor +values to the same process, perform the reduction, and then broadcast the +values back to all other processes, effectively creating a synchronous +parameter server in one process. However, the process responsible for +performing the reduction will have to receive and send a massive amount of data +which scales with the number of processes *and* the number of parameters in the +model. + +Instead of centralizing the reduction and having one primary reducer, we can +implement a distributed allreduce or allgather. A bandwidth-optimal allreduce +will end up sending 2(N - 1) values for every value in the input tensor, +and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce +requires at least (N - 1) sends between the different nodes, and a broadcast of +the result also requires (N - 1) sends, for a total of 2 (N - 1); these two +steps cannot be combined in a clever way to reduce the number of required +sends.) This module implements bandwidth-optimal ring allreduce and ring +allgather operations using MPI; by choosing a hardware-appropriate MPI +implementation (such as OpenMPI with CUDA-IPC support), you can train large +models with synchronous gradient descent with minimal communication overhead. + +In addition to the `allreduce` and `allgather` functions, a convenience +`DistributedOptimizer` wrapper is provided to simplify using these functions +for reducing model gradients. + +Example: + +```python +import tensorflow as tf +from tensorflow.contrib import mpi_collectives as mpi + +# Construct a simple linear regression model to optimize +W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32) +B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32) +inputs = tf.placeholder("Inputs", shape=[None, 20]) +outputs = tf.placeholder("Outputs", shape=[None, 1]) +loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs) + +# Training using MPI allreduce with DistributedOptimizer +optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer()) +train = optimizer.minimize(loss) + +# Average loss over all ranks, for printing. +# Do not pass this to an optimizer! +avg_loss = mpi.allreduce(loss) + +# On different ranks, feed different input data. +with mpi.Session() as session: + rank = session.run(mpi.rank()) + batch_inputs, batch_outputs = construct_batch_for_rank(rank) + feed_dict = {inputs: batch_inputs, outputs: batch_outputs} + _, l = session.run([train, avg_loss], feed_dict=feed_dict) + print("Average Loss:", l) +``` + +[1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms +for Clusters of Workstations". + +@@Session +@@DistributedOptimizer +@@allreduce +@@allgather +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + +from tensorflow.contrib.mpi_collectives.mpi_ops import size +from tensorflow.contrib.mpi_collectives.mpi_ops import rank +from tensorflow.contrib.mpi_collectives.mpi_ops import local_rank +from tensorflow.contrib.mpi_collectives.mpi_ops import allgather +from tensorflow.contrib.mpi_collectives.mpi_ops import _allreduce +from tensorflow.contrib.mpi_collectives.mpi_ops import init + + +def allreduce(tensor, average=True): + """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices. + + Arguments: + tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. + The shape of the input must be identical across all ranks. + average: If True, computes the average over all ranks. + Otherwise, computes the sum over all ranks. + + This function performs a bandwidth-optimal ring allreduce on the input + tensor. If the input is an tf.IndexedSlices, the function instead does an + allgather on the values and the indices, effectively doing an allreduce on + the represented tensor. + """ + if isinstance(tensor, tf.IndexedSlices): + # For IndexedSlices, do two allgathers intead of an allreduce. + mpi_size = tf.cast(size(), tensor.values.dtype) + values = allgather(tensor.values) + indices = allgather(tensor.indices) + + # To make this operation into an average, divide all gathered values by + # the MPI size. + new_values = tf.div(values, mpi_size) if average else values + return tf.IndexedSlices(new_values, indices, + dense_shape=tensor.dense_shape) + else: + mpi_size = tf.cast(size(), tensor.dtype) + summed_tensor = _allreduce(tensor) + new_tensor = (tf.div(summed_tensor, mpi_size) + if average else summed_tensor) + return new_tensor + + +class DistributedOptimizer(tf.train.Optimizer): + """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to + average gradient values before applying gradients to model weights.""" + + def __init__(self, optimizer, name=None, use_locking=False): + """Construct a new DistributedOptimizer, which uses another optimizer + under the hood for computing single-process gradient values and + applying gradient updates after the gradient values have been averaged + across all the MPI ranks. + + Args: + optimizer: Optimizer to use for computing gradients and applying updates. + name: Optional name prefix for the operations created when applying + gradients. Defaults to "Distributed" followed by the provided + optimizer type. + use_locking: Whether to use locking when updating variables. See + Optimizer.__init__ for more info. + """ + if name is None: + name = "Distributed{}".format(type(optimizer).__name__) + + self._optimizer = optimizer + super(DistributedOptimizer, self).__init__( + name=name, use_locking=use_locking) + + def compute_gradients(self, *args, **kwargs): + """Compute gradients of all trainable variables. + + See Optimizer.compute_gradients() for more info. + + In DistributedOptimizer, compute_gradients() is overriden to also + allreduce the gradients before returning them. + """ + gradients = (super(DistributedOptimizer, self) + .compute_gradients(*args, **kwargs)) + return [(allreduce(gradient), var) for (gradient, var) in gradients] + + def _apply_dense(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._apply_dense(*args, **kwargs) + + def _apply_sparse(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._apply_sparse(*args, **kwargs) + + def _apply_sparse_duplicate_indices(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._apply_sparse_duplicate_indices(*args, + **kwargs) + + def _prepare(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._prepare(*args, **kwargs) + + def _create_slots(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._create_slots(*args, **kwargs) + + def _valid_dtypes(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._valid_dtypes(*args, **kwargs) + + def _finish(self, *args, **kwargs): + """Calls this same method on the underlying optimizer.""" + return self._optimizer._finish(*args, **kwargs) + + +class Session(tf.Session): + """A class for running TensorFlow operations, with copies of the same graph + running distributed across different MPI nodes. + + The primary difference between `tf.Session` and + `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that + the `Session` options are correct for use with `tf.contrib.mpi`, and + initializes MPI immediately upon the start of the session. + """ + + def __init__(self, target='', graph=None, config=None): + """Creates a new TensorFlow MPI session. + + Unlike a normal `tf.Session`, an MPI Session may only use a single GPU, + which must be specified in advance before the session is initialized. + In addition, it only uses a single graph evaluation thread, and + initializes MPI immediately upon starting. + + If no `graph` argument is specified when constructing the session, + the default graph will be launched in the session. If you are + using more than one graph (created with `tf.Graph()` in the same + process, you will have to use different sessions for each graph, + but each graph can be used in multiple sessions. In this case, it + is often clearer to pass the graph to be launched explicitly to + the session constructor. + + Args: + target: (Optional.) The execution engine to connect to. + graph: (Optional.) The `Graph` to be launched (described above). + config: (Optional.) A `ConfigProto` protocol buffer with configuration + options for the session. + """ + super(Session, self).__init__(target, graph, config=config) + + # Initialize MPI on the relevant device. + # TODO: Move this to library load and eliminate mpi.Session() + if graph is None: + graph = tf.get_default_graph() + with graph.as_default(): + self.run(init()) |