aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/training/sync_replicas_optimizer.py
diff options
context:
space:
mode:
authorGravatar Jianmin Chen <jmchen@google.com>2016-10-07 12:53:06 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2016-10-07 14:03:39 -0700
commitecdee38a534133ecd7ba18e58527cc4120277190 (patch)
tree5b76e2e8a3038cb3b11539121360c062c2719154 /tensorflow/python/training/sync_replicas_optimizer.py
parent2c8d270735176df1a59b5a80885b2e14b4f06953 (diff)
Switch to the new accumulators in the sync_rep optimizer (currently called V2). Please note that the gradients from replicas are now averaged instead of summed (as in the old sync_replicas_optimizer) so you need to increase the learning rate according to the number of replicas. This change is introduced to be consistent with how gradients are aggregated (averaged) within a batch in a replica.
As shown in the code change, the switch results in: 1. much cleaner and simpler code. 2. much more efficient and reliable staleness check. It is now 100% strict with no extra contention to PS servers. 3. no need for clean_up op so we can get rid of the abort_op which can confuse users. 4. number of replicas can be changed without complaints from checkpoint as the local_step is now just a local variable instead of a global vector variable. This has been tried with manual restarts of workers (chief or non chief) and ps and seems to be quite robust. Change: 135513399
Diffstat (limited to 'tensorflow/python/training/sync_replicas_optimizer.py')
-rw-r--r--tensorflow/python/training/sync_replicas_optimizer.py414
1 files changed, 414 insertions, 0 deletions
diff --git a/tensorflow/python/training/sync_replicas_optimizer.py b/tensorflow/python/training/sync_replicas_optimizer.py
index ba07cd5908..24d8177f7d 100644
--- a/tensorflow/python/training/sync_replicas_optimizer.py
+++ b/tensorflow/python/training/sync_replicas_optimizer.py
@@ -31,6 +31,416 @@ from tensorflow.python.training import optimizer
from tensorflow.python.training import queue_runner
+# Please note that the gradients from replicas are averaged instead of summed
+# (as in the old sync_replicas_optimizer) so you need to increase the learning
+# rate according to the number of replicas. This change is introduced to be
+# consistent with how gradients are aggregated (averaged) within a batch in a
+# replica.
+class SyncReplicasOptimizerV2(optimizer.Optimizer):
+ """Class to synchronize, aggregate gradients and pass them to the optimizer.
+
+ In a typical asynchronous training environment, it's common to have some
+ stale gradients. For example, with a N-replica asynchronous training,
+ gradients will be applied to the variables N times independently. Depending
+ on each replica's training speed, some gradients might be calculated from
+ copies of the variable from several steps back (N-1 steps on average). This
+ optimizer avoids stale gradients by collecting gradients from all replicas,
+ averaging them, then applying them to the variables in one shot, after
+ which replicas can fetch the new variables and continue.
+
+ The following accumulators/queue are created:
+ <empty line>
+ * N `gradient accumulators`, one per variable to train. Gradients are pushed
+ to them and the chief worker will wait until enough gradients are collected
+ and then average them before applying to variables. The accumulator will
+ drop all stale gradients (more details in the accumulator op).
+ * 1 `token` queue where the optimizer pushes the new global_step value after
+ all variables are updated.
+
+ The following local variable is created:
+ * `sync_rep_local_step`, one per replica. Compared against the global_step in
+ each accumulator to check for staleness of the gradients.
+
+ The optimizer adds nodes to the graph to collect gradients and pause the
+ trainers until variables are updated.
+ For the Parameter Server job:
+ <empty line>
+ 1. An accumulator is created for each variable, and each replica pushes the
+ gradients into the accumulators instead of directly applying them to the
+ variables.
+ 2. Each accumulator averages once enough gradients (replicas_to_aggregate)
+ have been accumulated.
+ 3. Apply the averaged gradients to the variables.
+ 4. Only after all variables have been updated, increment the global step.
+ 5. Only after step 4, pushes `global_step` in the `token_queue`, once for
+ each worker replica. The workers can now fetch the global step, use it to
+ update its local_step variable and start the next batch.
+
+ For the replicas:
+ <empty line>
+ 1. Start a step: fetch variables and compute gradients.
+ 2. Once the gradients have been computed, push them into gradient
+ accumulators. Each accumulator will check the staleness and drop the stale.
+ 3. After pushing all the gradients, dequeue an updated value of global_step
+ from the token queue and record that step to its local_step variable. Note
+ that this is effectively a barrier.
+ 4. Start the next batch.
+
+ ### Usage
+
+ ```python
+ # Create any optimizer to update the variables, say a simple SGD:
+ opt = GradientDescentOptimizer(learning_rate=0.1)
+
+ # Wrap the optimizer with sync_replicas_optimizer with 50 replicas: at each
+ # step the optimizer collects 50 gradients before applying to variables.
+ # Note that if you want to have 2 backup replicas, you can change
+ # total_num_replicas=52 and make sure this number matches how many physical
+ # replicas you started in your job.
+ opt = tf.SyncReplicasOptimizerV2(opt, replicas_to_aggregate=50,
+ total_num_replicas=50)
+
+ # Some models have startup_delays to help stabilize the model but when using
+ # sync_replicas training, set it to 0.
+
+ # Now you can call `minimize()` or `compute_gradients()` and
+ # `apply_gradients()` normally
+ grads = opt.minimize(total_loss, global_step=self.global_step)
+
+
+ # You can now call get_init_tokens_op() and get_chief_queue_runner().
+ # Note that get_init_tokens_op() must be called before creating session
+ # because it modifies the graph by adding new nodes.
+ init_token_op = opt.get_init_tokens_op()
+ chief_queue_runner = opt.get_chief_queue_runner()
+ ```
+
+ In the training program, every worker will run the train_op as if not
+ synchronized. But one worker (usually the chief) will need to execute the
+ chief_queue_runner and get_init_tokens_op from this optimizer.
+
+ ```python
+ # When you create the supervisor, you need to add the local_init_op and
+ # ready_for_local_init_op to make sure the local_step is initialized to the
+ # global_step. Here is an example:
+ sv = tf.Supervisor(graph=g,
+ is_chief=is_chief,
+ # This initialize local step.
+ local_init_op=local_init_op,
+ # This makes sure global step is initialized before using.
+ ready_for_local_init_op=ready_for_local_init_op,
+ saver=model.saver)
+
+ # After the session is created by the Supervisor and before the main while
+ # loop:
+ if is_chief and FLAGS.sync_replicas:
+ sv.start_queue_runners(sess, [chief_queue_runner])
+ # Insert initial tokens to the queue.
+ sess.run(init_token_op)
+ ```
+
+ @@__init__
+ @@compute_gradients
+ @@apply_gradients
+ @@get_chief_queue_runner
+ @@get_init_tokens_op
+ """
+
+ def __init__(self,
+ opt,
+ replicas_to_aggregate,
+ total_num_replicas=None,
+ variable_averages=None,
+ variables_to_average=None,
+ use_locking=False,
+ name="sync_replicas"):
+ """Construct a sync_replicas optimizer.
+
+ Args:
+ opt: The actual optimizer that will be used to compute and apply the
+ gradients. Must be one of the Optimizer classes.
+ replicas_to_aggregate: number of replicas to aggregate for each variable
+ update.
+ total_num_replicas: Total number of tasks/workers/replicas, could be
+ different from replicas_to_aggregate.
+ If total_num_replicas > replicas_to_aggregate: it is backup_replicas +
+ replicas_to_aggregate.
+ If total_num_replicas < replicas_to_aggregate: Replicas compute
+ multiple batches per update to variables.
+ variable_averages: Optional `ExponentialMovingAverage` object, used to
+ maintain moving averages for the variables passed in
+ `variables_to_average`.
+ variables_to_average: a list of variables that need to be averaged. Only
+ needed if variable_averages is passed in.
+ use_locking: If True use locks for update operation.
+ name: string. Optional name of the returned operation.
+ """
+ if total_num_replicas is None:
+ total_num_replicas = replicas_to_aggregate
+
+ super(SyncReplicasOptimizerV2, self).__init__(use_locking, name)
+ logging.info(
+ "SyncReplicasV2: replicas_to_aggregate=%s; total_num_replicas=%s",
+ replicas_to_aggregate, total_num_replicas)
+ self._opt = opt
+ self._replicas_to_aggregate = replicas_to_aggregate
+ self._gradients_applied = False
+ self._variable_averages = variable_averages
+ self._variables_to_average = variables_to_average
+ self._total_num_replicas = total_num_replicas
+ self._tokens_per_step = max(total_num_replicas, replicas_to_aggregate)
+ self._global_step = None
+ self._sync_token_queue = None
+
+ # The synchronization op will be executed in a queue runner which should
+ # only be executed by one of the replicas (usually the chief).
+ self._chief_queue_runner = None
+
+ # Remember which accumulator is on which device to set the initial step in
+ # the accumulator to be global step. This list contains list of the
+ # following format: (accumulator, device).
+ self._accumulator_list = []
+
+ def compute_gradients(self, *args, **kwargs):
+ """Compute gradients of "loss" for the variables in "var_list".
+
+ This simply wraps the compute_gradients() from the real optimizer. The
+ gradients will be aggregated in the apply_gradients() so that user can
+ modify the gradients like clipping with per replica global norm if needed.
+ The global norm with aggregated gradients can be bad as one replica's huge
+ gradients can hurt the gradients from other replicas.
+
+ Args:
+ *args: Arguments for compute_gradients().
+ **kwargs: Keyword arguments for compute_gradients().
+
+ Returns:
+ A list of (gradient, variable) pairs.
+ """
+ return self._opt.compute_gradients(*args, **kwargs)
+
+ def apply_gradients(self, grads_and_vars, global_step=None, name=None):
+ """Apply gradients to variables.
+
+ This contains most of the synchronization implementation and also wraps the
+ apply_gradients() from the real optimizer.
+
+ Args:
+ grads_and_vars: List of (gradient, variable) pairs as returned by
+ compute_gradients().
+ global_step: Optional Variable to increment by one after the
+ variables have been updated.
+ name: Optional name for the returned operation. Default to the
+ name passed to the Optimizer constructor.
+
+ Returns:
+ train_op: The op to dequeue a token so the replicas can exit this batch
+ and start the next one. This is executed by each replica.
+
+ Raises:
+ ValueError: If the grads_and_vars is empty.
+ ValueError: If global step is not provided, the staleness cannot be
+ checked.
+ """
+ if not grads_and_vars:
+ raise ValueError("Must supply at least one variable")
+
+ if global_step is None:
+ raise ValueError("Global step is required to check staleness")
+
+ self._global_step = global_step
+ train_ops = []
+ aggregated_grad = []
+ var_list = []
+
+ self._local_step = variables.Variable(
+ initial_value=0,
+ trainable=False,
+ collections=[ops.GraphKeys.LOCAL_VARIABLES],
+ name="sync_rep_local_step")
+ self.local_step_init_op = state_ops.assign(self._local_step, global_step)
+ chief_init_ops = [self.local_step_init_op]
+ self.ready_for_local_init_op = variables.report_uninitialized_variables(
+ variables.all_variables())
+
+ with ops.name_scope(None, self._name):
+ for grad, var in grads_and_vars:
+ var_list.append(var)
+ with ops.device(var.device):
+ # Dense gradients.
+ if grad is None:
+ aggregated_grad.append(None) # pass-through.
+ continue
+ elif isinstance(grad, ops.Tensor):
+ grad_accum = data_flow_ops.ConditionalAccumulator(
+ grad.dtype,
+ shape=var.get_shape(),
+ shared_name=var.name + "/grad_accum")
+ train_ops.append(grad_accum.apply_grad(
+ grad, local_step=self._local_step))
+ aggregated_grad.append(grad_accum.take_grad(
+ self._replicas_to_aggregate))
+ else:
+ if not isinstance(grad, ops.IndexedSlices):
+ raise ValueError("Unknown grad type!")
+ grad_accum = data_flow_ops.SparseConditionalAccumulator(
+ grad.dtype, shape=(), shared_name=var.name + "/grad_accum")
+ train_ops.append(grad_accum.apply_indexed_slices_grad(
+ grad, local_step=self._local_step))
+ aggregated_grad.append(grad_accum.take_indexed_slices_grad(
+ self._replicas_to_aggregate))
+
+ self._accumulator_list.append((grad_accum, var.device))
+
+ aggregated_grads_and_vars = zip(aggregated_grad, var_list)
+
+ # sync_op will be assigned to the same device as the global step.
+ with ops.device(global_step.device), ops.name_scope(""):
+ update_op = self._opt.apply_gradients(aggregated_grads_and_vars,
+ global_step)
+
+ # Create token queue.
+ with ops.device(global_step.device), ops.name_scope(""):
+ sync_token_queue = (
+ data_flow_ops.FIFOQueue(-1,
+ global_step.dtype.base_dtype,
+ shapes=(),
+ shared_name="sync_token_q"))
+ self._sync_token_queue = sync_token_queue
+
+ # dummy_queue is passed to the queue runner. Don't use the real queues
+ # because the queue runner doesn't automatically reopen it once it
+ # closed queues in PS devices.
+ dummy_queue = (
+ data_flow_ops.FIFOQueue(1,
+ types_pb2.DT_INT32,
+ shapes=(),
+ shared_name="dummy_queue"))
+
+ with ops.device(global_step.device), ops.name_scope(""):
+ # Replicas have to wait until they can get a token from the token queue.
+ with ops.control_dependencies(train_ops):
+ token = sync_token_queue.dequeue()
+ train_op = state_ops.assign(self._local_step, token)
+
+ with ops.control_dependencies([update_op]):
+ # Sync_op needs to insert tokens to the token queue at the end of the
+ # step so the replicas can fetch them to start the next step.
+ tokens = array_ops.fill([self._tokens_per_step], global_step.ref())
+ sync_op = sync_token_queue.enqueue_many((tokens,))
+
+ if self._variable_averages is not None:
+ with ops.control_dependencies([sync_op]), ops.name_scope(""):
+ sync_op = self._variable_averages.apply(
+ self._variables_to_average)
+
+ self._chief_queue_runner = queue_runner.QueueRunner(dummy_queue,
+ [sync_op])
+ for accum, dev in self._accumulator_list:
+ with ops.device(dev):
+ chief_init_ops.append(
+ accum.set_global_step(
+ global_step, name="SetGlobalStep"))
+ self.chief_init_op = control_flow_ops.group(*(chief_init_ops))
+ self._gradients_applied = True
+ return train_op
+
+ def get_chief_queue_runner(self):
+ """Returns the QueueRunner for the chief to execute.
+
+ This includes the operations to synchronize replicas: aggregate gradients,
+ apply to variables, increment global step, insert tokens to token queue.
+
+ Note that this can only be called after calling apply_gradients() which
+ actually generates this queuerunner.
+
+ Returns:
+ A `QueueRunner` for chief to execute.
+
+ Raises:
+ ValueError: If this is called before apply_gradients().
+ """
+ if self._gradients_applied is False:
+ raise ValueError("Should be called after apply_gradients().")
+
+ return self._chief_queue_runner
+
+ def get_slot(self, *args, **kwargs):
+ """Return a slot named "name" created for "var" by the Optimizer.
+
+ This simply wraps the get_slot() from the actual optimizer.
+
+ Args:
+ *args: Arguments for get_slot().
+ **kwargs: Keyword arguments for get_slot().
+
+ Returns:
+ The `Variable` for the slot if it was created, `None` otherwise.
+ """
+ return self._opt.get_slot(*args, **kwargs)
+
+ def get_slot_names(self, *args, **kwargs):
+ """Return a list of the names of slots created by the `Optimizer`.
+
+ This simply wraps the get_slot_names() from the actual optimizer.
+
+ Args:
+ *args: Arguments for get_slot().
+ **kwargs: Keyword arguments for get_slot().
+
+ Returns:
+ A list of strings.
+ """
+ return self._opt.get_slot_names(*args, **kwargs)
+
+ def get_init_tokens_op(self, num_tokens=-1):
+ """Returns the op to fill the sync_token_queue with the tokens.
+
+ This is supposed to be executed in the beginning of the chief/sync thread
+ so that even if the total_num_replicas is less than replicas_to_aggregate,
+ the model can still proceed as the replicas can compute multiple steps per
+ variable update. Make sure:
+ `num_tokens >= replicas_to_aggregate - total_num_replicas`.
+
+ Args:
+ num_tokens: Number of tokens to add to the queue.
+
+ Returns:
+ An op for the chief/sync replica to fill the token queue.
+
+ Raises:
+ ValueError: If this is called before apply_gradients().
+ ValueError: If num_tokens are smaller than replicas_to_aggregate -
+ total_num_replicas.
+ """
+ if self._gradients_applied is False:
+ raise ValueError(
+ "get_init_tokens_op() should be called after apply_gradients().")
+
+ tokens_needed = self._replicas_to_aggregate - self._total_num_replicas
+ if num_tokens == -1:
+ num_tokens = self._replicas_to_aggregate
+ elif num_tokens < tokens_needed:
+ raise ValueError(
+ "Too few tokens to finish the first step: %d (given) vs %d (needed)" %
+ (num_tokens, tokens_needed))
+
+ if num_tokens > 0:
+ with ops.device(self._global_step.device), ops.name_scope(""):
+ tokens = array_ops.fill([num_tokens],
+ self._global_step.ref())
+ init_tokens = self._sync_token_queue.enqueue_many((tokens,))
+ else:
+ init_tokens = control_flow_ops.no_op(name="no_init_tokens")
+
+ return init_tokens
+
+
+# Please switch to v2 if you are still using the old sync optimizer. V2
+# is much more efficient and stable. It also removed 100% of the stale
+# gradients which is not possible in this implementation without significant
+# overhead. This is kept here just for backward compatibility and will be
+# DEPRECATED later.
class SyncReplicasOptimizer(optimizer.Optimizer):
"""Class to synchronize, aggregate gradients and pass them to the optimizer.
@@ -170,6 +580,10 @@ class SyncReplicasOptimizer(optimizer.Optimizer):
total_num_replicas = replicas_to_aggregate
super(SyncReplicasOptimizer, self).__init__(use_locking, name)
+ logging.info("""TO BE DEPRECATED!!!
+ This version will be deprecated. Please switch to V2 at your
+ earliest convenience.""")
+
logging.info(
"SyncReplicas enabled: replicas_to_aggregate=%s; total_num_replicas=%s",
replicas_to_aggregate, total_num_replicas)