aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/mpi_collectives/__init__.py
blob: 9ed16a6f078a506b60fd14f4356ff65a0a692203 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# pylint: disable=g-short-docstring-punctuation
"""## Communicating Between Processes with MPI

TensorFlow natively provides inter-device communication through send and
receive ops and inter-node communication through Distributed TensorFlow, based
on the same send and receive abstractions. On HPC clusters where Infiniband or
other high-speed node interconnects are available, these can end up being
insufficient for synchronous data-parallel training (without asynchronous
gradient descent). This module implements a variety of MPI ops which can take
advantage of hardware-specific MPI libraries for efficient communication.

In order to use this module, TensorFlow must be built with an MPI library,
which can be provided to the `./configure` script at build time. As a user of
TensorFlow, you will need to build TensorFlow yourself to select the MPI
library to use; to do so, follow the [instructions for building TensorFlow from
source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources).

### Utility Ops

In addition to reductions and gathers, this module provides utility operations
for detecting the running MPI configuration.

Example:

```python
from tensorflow.contrib import mpi

# Use `mpi.Session` instead of `tf.Session`
with mpi.Session() as session:
    rank = session.run(mpi.rank())
    print("My MPI Rank:", rank)

    if rank == 0:
        print("MPI Size:", session.run(mpi.size()))
```

@@rank
@@size

### Ring Allreduce and Allgather

When summing or averaging tensors across many processes, communication can
easily become a bottleneck. A naive implementation will send all the tensor
values to the same process, perform the reduction, and then broadcast the
values back to all other processes, effectively creating a synchronous
parameter server in one process. However, the process responsible for
performing the reduction will have to receive and send a massive amount of data
which scales with the number of processes *and* the number of parameters in the
model.

Instead of centralizing the reduction and having one primary reducer, we can
implement a distributed allreduce or allgather. A bandwidth-optimal allreduce
will end up sending 2(N - 1) values for every value in the input tensor,
and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce
requires at least (N - 1) sends between the different nodes, and a broadcast of
the result also requires (N - 1) sends, for a total of 2 (N - 1); these two
steps cannot be combined in a clever way to reduce the number of required
sends.) This module implements bandwidth-optimal ring allreduce and ring
allgather operations using MPI; by choosing a hardware-appropriate MPI
implementation (such as OpenMPI with CUDA-IPC support), you can train large
models with synchronous gradient descent with minimal communication overhead.

In addition to the `allreduce` and `allgather` functions, a convenience
`DistributedOptimizer` wrapper is provided to simplify using these functions
for reducing model gradients.

Example:

```python
import tensorflow as tf
from tensorflow.contrib import mpi_collectives as mpi

# Construct a simple linear regression model to optimize
W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32)
B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32)
inputs = tf.placeholder("Inputs", shape=[None, 20])
outputs = tf.placeholder("Outputs", shape=[None, 1])
loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs)

# Training using MPI allreduce with DistributedOptimizer
optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer())
train = optimizer.minimize(loss)

# Average loss over all ranks, for printing.
# Do not pass this to an optimizer!
avg_loss = mpi.allreduce(loss)

# On different ranks, feed different input data.
with mpi.Session() as session:
    rank = session.run(mpi.rank())
    batch_inputs, batch_outputs = construct_batch_for_rank(rank)
    feed_dict = {inputs: batch_inputs, outputs: batch_outputs}
    _, l = session.run([train, avg_loss], feed_dict=feed_dict)
    print("Average Loss:", l)
```

[1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms
for Clusters of Workstations".

@@Session
@@DistributedOptimizer
@@allreduce
@@allgather
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf

from tensorflow.contrib.mpi_collectives.mpi_ops import size
from tensorflow.contrib.mpi_collectives.mpi_ops import rank
from tensorflow.contrib.mpi_collectives.mpi_ops import local_rank
from tensorflow.contrib.mpi_collectives.mpi_ops import allgather
from tensorflow.contrib.mpi_collectives.mpi_ops import _allreduce
from tensorflow.contrib.mpi_collectives.mpi_ops import init


def allreduce(tensor, average=True):
  """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices.

  Arguments:
  tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce.
          The shape of the input must be identical across all ranks.
  average: If True, computes the average over all ranks.
           Otherwise, computes the sum over all ranks.

  This function performs a bandwidth-optimal ring allreduce on the input
  tensor. If the input is an tf.IndexedSlices, the function instead does an
  allgather on the values and the indices, effectively doing an allreduce on
  the represented tensor.
  """
  if isinstance(tensor, tf.IndexedSlices):
    # For IndexedSlices, do two allgathers intead of an allreduce.
    mpi_size = tf.cast(size(), tensor.values.dtype)
    values = allgather(tensor.values)
    indices = allgather(tensor.indices)

    # To make this operation into an average, divide all gathered values by
    # the MPI size.
    new_values = tf.div(values, mpi_size) if average else values
    return tf.IndexedSlices(new_values, indices,
                            dense_shape=tensor.dense_shape)
  else:
    mpi_size = tf.cast(size(), tensor.dtype)
    summed_tensor = _allreduce(tensor)
    new_tensor = (tf.div(summed_tensor, mpi_size)
                  if average else summed_tensor)
    return new_tensor


class DistributedOptimizer(tf.train.Optimizer):
  """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to
  average gradient values before applying gradients to model weights."""

  def __init__(self, optimizer, name=None, use_locking=False):
    """Construct a new DistributedOptimizer, which uses another optimizer
    under the hood for computing single-process gradient values and
    applying gradient updates after the gradient values have been averaged
    across all the MPI ranks.

    Args:
    optimizer: Optimizer to use for computing gradients and applying updates.
    name: Optional name prefix for the operations created when applying
          gradients. Defaults to "Distributed" followed by the provided
          optimizer type.
    use_locking: Whether to use locking when updating variables. See
                 Optimizer.__init__ for more info.
    """
    if name is None:
      name = "Distributed{}".format(type(optimizer).__name__)

    self._optimizer = optimizer
    super(DistributedOptimizer, self).__init__(
        name=name, use_locking=use_locking)

  def compute_gradients(self, *args, **kwargs):
    """Compute gradients of all trainable variables.

    See Optimizer.compute_gradients() for more info.

    In DistributedOptimizer, compute_gradients() is overridden to also
    allreduce the gradients before returning them.
    """
    gradients = (super(DistributedOptimizer, self)
                 .compute_gradients(*args, **kwargs))
    return [(allreduce(gradient), var) for (gradient, var) in gradients]

  def _apply_dense(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._apply_dense(*args, **kwargs)

  def _apply_sparse(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._apply_sparse(*args, **kwargs)

  def _apply_sparse_duplicate_indices(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._apply_sparse_duplicate_indices(*args,
                                                           **kwargs)

  def _prepare(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._prepare(*args, **kwargs)

  def _create_slots(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._create_slots(*args, **kwargs)

  def _valid_dtypes(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._valid_dtypes(*args, **kwargs)

  def _finish(self, *args, **kwargs):
    """Calls this same method on the underlying optimizer."""
    return self._optimizer._finish(*args, **kwargs)


class Session(tf.Session):
  """A class for running TensorFlow operations, with copies of the same graph
  running distributed across different MPI nodes.

  The primary difference between `tf.Session` and
  `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that
  the `Session` options are correct for use with `tf.contrib.mpi`, and
  initializes MPI immediately upon the start of the session.
  """

  def __init__(self, target='', graph=None, config=None):
    """Creates a new TensorFlow MPI session.

    Unlike a normal `tf.Session`, an MPI Session may only use a single GPU,
    which must be specified in advance before the session is initialized.
    In addition, it only uses a single graph evaluation thread, and
    initializes MPI immediately upon starting.

    If no `graph` argument is specified when constructing the session,
    the default graph will be launched in the session. If you are
    using more than one graph (created with `tf.Graph()` in the same
    process, you will have to use different sessions for each graph,
    but each graph can be used in multiple sessions. In this case, it
    is often clearer to pass the graph to be launched explicitly to
    the session constructor.

    Args:
    target: (Optional.) The execution engine to connect to.
    graph: (Optional.) The `Graph` to be launched (described above).
    config: (Optional.) A `ConfigProto` protocol buffer with configuration
    options for the session.
    """
    super(Session, self).__init__(target, graph, config=config)

    # Initialize MPI on the relevant device.
    # TODO: Move this to library load and eliminate mpi.Session()
    if graph is None:
      graph = tf.get_default_graph()
    with graph.as_default():
      self.run(init())