aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/docs_src/deploy/distributed.md
blob: 99390f7416c87ea76fae1469797f53073ef77aca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# Distributed TensorFlow

This document shows how to create a cluster of TensorFlow servers, and how to
distribute a computation graph across that cluster. We assume that you are
familiar with the @{$get_started/get_started$basic concepts} of
writing TensorFlow programs.

## Hello distributed TensorFlow!

To see a simple TensorFlow cluster in action, execute the following:

```shell
# Start a TensorFlow server as a single-process "cluster".
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'
```

The
@{tf.train.Server.create_local_server}
method creates a single-process cluster, with an in-process server.

## Create a cluster

<div class="video-wrapper">
  <iframe class="devsite-embedded-youtube-video" data-video-id="la_M6bCV91M"
          data-autohide="1" data-showinfo="0" frameborder="0" allowfullscreen>
  </iframe>
</div>

A TensorFlow "cluster" is a set of "tasks" that participate in the distributed
execution of a TensorFlow graph. Each task is associated with a TensorFlow
"server", which contains a "master" that can be used to create sessions, and a
"worker" that executes operations in the graph.  A cluster can also be divided
into one or more "jobs", where each job contains one or more tasks.

To create a cluster, you start one TensorFlow server per task in the cluster.
Each task typically runs on a different machine, but you can run multiple tasks
on the same machine (e.g. to control different GPU devices). In each task, do
the following:

1.  **Create a `tf.train.ClusterSpec`** that describes all of the tasks
    in the cluster. This should be the same for each task.

2.  **Create a `tf.train.Server`**, passing the `tf.train.ClusterSpec` to
    the constructor, and identifying the local task with a job name
    and task index.


### Create a `tf.train.ClusterSpec` to describe the cluster

The cluster specification dictionary maps job names to lists of network
adresses. Pass this dictionary to
the @{tf.train.ClusterSpec}
constructor.  For example:

<table>
  <tr><th><code>tf.train.ClusterSpec</code> construction</th><th>Available tasks</th>
  <tr>
    <td><pre>
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
</pre></td>
<td><code>/job:local/task:0<br/>/job:local/task:1</code></td>
  </tr>
  <tr>
    <td><pre>
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
</pre></td><td><code>/job:worker/task:0</code><br/><code>/job:worker/task:1</code><br/><code>/job:worker/task:2</code><br/><code>/job:ps/task:0</code><br/><code>/job:ps/task:1</code></td>
  </tr>
</table>

### Create a `tf.train.Server` instance in each task

A @{tf.train.Server} object contains a
set of local devices, a set of connections to other tasks in its
`tf.train.ClusterSpec`, and a
@{tf.Session} that can use these
to perform a distributed computation. Each server is a member of a specific
named job and has a task index within that job.  A server can communicate with
any other server in the cluster.

For example, to launch a cluster with two servers running on `localhost:2222`
and `localhost:2223`, run the following snippets in two different processes on
the local machine:

```python
# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
```
```python
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
```

**Note:** Manually specifying these cluster specifications can be tedious,
especially for large clusters. We are working on tools for launching tasks
programmatically, e.g. using a cluster manager like
[Kubernetes](http://kubernetes.io). If there are particular cluster managers for
which you'd like to see support, please raise a
[GitHub issue](https://github.com/tensorflow/tensorflow/issues).

## Specifying distributed devices in your model

To place operations on a particular process, you can use the same
@{tf.device}
function that is used to specify whether ops run on the CPU or GPU. For example:

```python
with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)
```

In the above example, the variables are created on two tasks in the `ps` job,
and the compute-intensive part of the model is created in the `worker`
job. TensorFlow will insert the appropriate data transfers between the jobs
(from `ps` to `worker` for the forward pass, and from `worker` to `ps` for
applying gradients).

## Replicated training

A common training configuration, called "data parallelism," involves multiple
tasks in a `worker` job training the same model on different mini-batches of
data, updating shared parameters hosted in one or more tasks in a `ps`
job. All tasks typically run on different machines. There are many ways to
specify this structure in TensorFlow, and we are building libraries that will
simplify the work of specifying a replicated model. Possible approaches include:

* **In-graph replication.** In this approach, the client builds a single
  `tf.Graph` that contains one set of parameters (in `tf.Variable` nodes pinned
  to `/job:ps`); and multiple copies of the compute-intensive part of the model,
  each pinned to a different task in `/job:worker`.

* **Between-graph replication.** In this approach, there is a separate client
  for each `/job:worker` task, typically in the same process as the worker
  task. Each client builds a similar graph containing the parameters (pinned to
  `/job:ps` as before using
  @{tf.train.replica_device_setter}
  to map them deterministically to the same tasks); and a single copy of the
  compute-intensive part of the model, pinned to the local task in
  `/job:worker`.

* **Asynchronous training.** In this approach, each replica of the graph has an
  independent training loop that executes without coordination. It is compatible
  with both forms of replication above.

* **Synchronous training.** In this approach, all of the replicas read the same
  values for the current parameters, compute gradients in parallel, and then
  apply them together. It is compatible with in-graph replication (e.g. using
  gradient averaging as in the
  [CIFAR-10 multi-GPU trainer](https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10/cifar10_multi_gpu_train.py)),
  and between-graph replication (e.g. using the
  @{tf.train.SyncReplicasOptimizer}).

### Putting it all together: example trainer program

The following code shows the skeleton of a distributed trainer program,
implementing **between-graph replication** and **asynchronous training**. It
includes the code for the parameter server and worker tasks.

```python
import argparse
import sys

import tensorflow as tf

FLAGS = None


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Build model...
      loss = ...
      global_step = tf.contrib.framework.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

    # The StopAtStepHook handles stopping after running given steps.
    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs",
                                           hooks=hooks) as mon_sess:
      while not mon_sess.should_stop():
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        # mon_sess.run handles AbortedError in case of preempted PS.
        mon_sess.run(train_op)


if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # Flags for defining the tf.train.ClusterSpec
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
```

To start the trainer with two parameter servers and two workers, use the
following command line (assuming the script is called `trainer.py`):

```shell
# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1
```

## Glossary

**Client**

A client is typically a program that builds a TensorFlow graph and constructs a
`tensorflow::Session` to interact with a cluster. Clients are typically written
in Python or C++. A single client process can directly interact with multiple
TensorFlow servers (see "Replicated training" above), and a single server can
serve multiple clients.

**Cluster**

A TensorFlow cluster comprises a one or more "jobs", each divided into lists of
one or more "tasks". A cluster is typically dedicated to a particular high-level
objective, such as training a neural network, using many machines in parallel. A
cluster is defined by
a @{tf.train.ClusterSpec} object.

**Job**

A job comprises a list of "tasks", which typically serve a common purpose.
For example, a job named `ps` (for "parameter server") typically hosts nodes
that store and update variables; while a job named `worker` typically hosts
stateless nodes that perform compute-intensive tasks. The tasks in a job
typically run on different machines. The set of job roles is flexible:
for example, a `worker` may maintain some state.

**Master service**

An RPC service that provides remote access to a set of distributed devices,
and acts as a session target. The master service implements the
`tensorflow::Session` interface, and is responsible for coordinating work across
one or more "worker services". All TensorFlow servers implement the master
service.

**Task**

A task corresponds to a specific TensorFlow server, and typically corresponds
to a single process. A task belongs to a particular "job" and is identified by
its index within that job's list of tasks.

**TensorFlow server** A process running
a @{tf.train.Server} instance, which is
a member of a cluster, and exports a "master service" and "worker service".

**Worker service**

An RPC service that executes parts of a TensorFlow graph using its local devices.
A worker service implements [worker_service.proto](https://www.tensorflow.org/code/tensorflow/core/protobuf/worker_service.proto).
All TensorFlow servers implement the worker service.