# Distributed TensorFlow This document shows how to create a cluster of TensorFlow servers, and how to distribute a computation graph across that cluster. We assume that you are familiar with the @{$guide/low_level_intro$basic concepts} of writing low level TensorFlow programs. ## Hello distributed TensorFlow! To see a simple TensorFlow cluster in action, execute the following: ```shell # Start a TensorFlow server as a single-process "cluster". $ python >>> import tensorflow as tf >>> c = tf.constant("Hello, distributed TensorFlow!") >>> server = tf.train.Server.create_local_server() >>> sess = tf.Session(server.target) # Create a session on the server. >>> sess.run(c) 'Hello, distributed TensorFlow!' ``` The `tf.train.Server.create_local_server` method creates a single-process cluster, with an in-process server. ## Create a cluster
A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph. A cluster can also be divided into one or more "jobs", where each job contains one or more tasks. To create a cluster, you start one TensorFlow server per task in the cluster. Each task typically runs on a different machine, but you can run multiple tasks on the same machine (e.g. to control different GPU devices). In each task, do the following: 1. **Create a `tf.train.ClusterSpec`** that describes all of the tasks in the cluster. This should be the same for each task. 2. **Create a `tf.train.Server`**, passing the `tf.train.ClusterSpec` to the constructor, and identifying the local task with a job name and task index. ### Create a `tf.train.ClusterSpec` to describe the cluster The cluster specification dictionary maps job names to lists of network addresses. Pass this dictionary to the `tf.train.ClusterSpec` constructor. For example:
tf.train.ClusterSpec constructionAvailable tasks
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1
### Create a `tf.train.Server` instance in each task A `tf.train.Server` object contains a set of local devices, a set of connections to other tasks in its `tf.train.ClusterSpec`, and a `tf.Session` that can use these to perform a distributed computation. Each server is a member of a specific named job and has a task index within that job. A server can communicate with any other server in the cluster. For example, to launch a cluster with two servers running on `localhost:2222` and `localhost:2223`, run the following snippets in two different processes on the local machine: ```python # In task 0: cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) server = tf.train.Server(cluster, job_name="local", task_index=0) ``` ```python # In task 1: cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) server = tf.train.Server(cluster, job_name="local", task_index=1) ``` **Note:** Manually specifying these cluster specifications can be tedious, especially for large clusters. We are working on tools for launching tasks programmatically, e.g. using a cluster manager like [Kubernetes](http://kubernetes.io). If there are particular cluster managers for which you'd like to see support, please raise a [GitHub issue](https://github.com/tensorflow/tensorflow/issues). ## Specifying distributed devices in your model To place operations on a particular process, you can use the same `tf.device` function that is used to specify whether ops run on the CPU or GPU. For example: ```python with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...) with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...) with tf.device("/job:worker/task:7"): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ... with tf.Session("grpc://worker7.example.com:2222") as sess: for _ in range(10000): sess.run(train_op) ``` In the above example, the variables are created on two tasks in the `ps` job, and the compute-intensive part of the model is created in the `worker` job. TensorFlow will insert the appropriate data transfers between the jobs (from `ps` to `worker` for the forward pass, and from `worker` to `ps` for applying gradients). ## Replicated training A common training configuration, called "data parallelism," involves multiple tasks in a `worker` job training the same model on different mini-batches of data, updating shared parameters hosted in one or more tasks in a `ps` job. All tasks typically run on different machines. There are many ways to specify this structure in TensorFlow, and we are building libraries that will simplify the work of specifying a replicated model. Possible approaches include: * **In-graph replication.** In this approach, the client builds a single `tf.Graph` that contains one set of parameters (in `tf.Variable` nodes pinned to `/job:ps`); and multiple copies of the compute-intensive part of the model, each pinned to a different task in `/job:worker`. * **Between-graph replication.** In this approach, there is a separate client for each `/job:worker` task, typically in the same process as the worker task. Each client builds a similar graph containing the parameters (pinned to `/job:ps` as before using `tf.train.replica_device_setter` to map them deterministically to the same tasks); and a single copy of the compute-intensive part of the model, pinned to the local task in `/job:worker`. * **Asynchronous training.** In this approach, each replica of the graph has an independent training loop that executes without coordination. It is compatible with both forms of replication above. * **Synchronous training.** In this approach, all of the replicas read the same values for the current parameters, compute gradients in parallel, and then apply them together. It is compatible with in-graph replication (e.g. using gradient averaging as in the [CIFAR-10 multi-GPU trainer](https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10/cifar10_multi_gpu_train.py)), and between-graph replication (e.g. using the `tf.train.SyncReplicasOptimizer`). ### Putting it all together: example trainer program The following code shows the skeleton of a distributed trainer program, implementing **between-graph replication** and **asynchronous training**. It includes the code for the parameter server and worker tasks. ```python import argparse import sys import tensorflow as tf FLAGS = None def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # Build model... loss = ... global_step = tf.contrib.framework.get_or_create_global_step() train_op = tf.train.AdagradOptimizer(0.01).minimize( loss, global_step=global_step) # The StopAtStepHook handles stopping after running given steps. hooks=[tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(FLAGS.task_index == 0), checkpoint_dir="/tmp/train_logs", hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. mon_sess.run(train_op) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.register("type", "bool", lambda v: v.lower() == "true") # Flags for defining the tf.train.ClusterSpec parser.add_argument( "--ps_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs" ) parser.add_argument( "--worker_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs" ) parser.add_argument( "--job_name", type=str, default="", help="One of 'ps', 'worker'" ) # Flags for defining the tf.train.Server parser.add_argument( "--task_index", type=int, default=0, help="Index of task within the job" ) FLAGS, unparsed = parser.parse_known_args() tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) ``` To start the trainer with two parameter servers and two workers, use the following command line (assuming the script is called `trainer.py`): ```shell # On ps0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0 # On ps1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1 # On worker0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0 # On worker1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1 ``` ## Glossary **Client** A client is typically a program that builds a TensorFlow graph and constructs a `tensorflow::Session` to interact with a cluster. Clients are typically written in Python or C++. A single client process can directly interact with multiple TensorFlow servers (see "Replicated training" above), and a single server can serve multiple clients. **Cluster** A TensorFlow cluster comprises one or more "jobs", each divided into lists of one or more "tasks". A cluster is typically dedicated to a particular high-level objective, such as training a neural network, using many machines in parallel. A cluster is defined by a `tf.train.ClusterSpec` object. **Job** A job comprises a list of "tasks", which typically serve a common purpose. For example, a job named `ps` (for "parameter server") typically hosts nodes that store and update variables; while a job named `worker` typically hosts stateless nodes that perform compute-intensive tasks. The tasks in a job typically run on different machines. The set of job roles is flexible: for example, a `worker` may maintain some state. **Master service** An RPC service that provides remote access to a set of distributed devices, and acts as a session target. The master service implements the `tensorflow::Session` interface, and is responsible for coordinating work across one or more "worker services". All TensorFlow servers implement the master service. **Task** A task corresponds to a specific TensorFlow server, and typically corresponds to a single process. A task belongs to a particular "job" and is identified by its index within that job's list of tasks. **TensorFlow server** A process running a `tf.train.Server` instance, which is a member of a cluster, and exports a "master service" and "worker service". **Worker service** An RPC service that executes parts of a TensorFlow graph using its local devices. A worker service implements [worker_service.proto](https://www.tensorflow.org/code/tensorflow/core/protobuf/worker_service.proto). All TensorFlow servers implement the worker service.