diff options
author | 2016-10-07 12:53:06 -0800 | |
---|---|---|
committer | 2016-10-07 14:03:39 -0700 | |
commit | ecdee38a534133ecd7ba18e58527cc4120277190 (patch) | |
tree | 5b76e2e8a3038cb3b11539121360c062c2719154 /tensorflow/python/training/sync_replicas_optimizer.py | |
parent | 2c8d270735176df1a59b5a80885b2e14b4f06953 (diff) |
Switch to the new accumulators in the sync_rep optimizer (currently called V2). Please note that the gradients from replicas are now averaged instead of summed (as in the old sync_replicas_optimizer) so you need to increase the learning rate according to the number of replicas. This change is introduced to be consistent with how gradients are aggregated (averaged) within a batch in a replica.
As shown in the code change, the switch results in:
1. much cleaner and simpler code.
2. much more efficient and reliable staleness check. It is now 100% strict with
no extra contention to PS servers.
3. no need for clean_up op so we can get rid of the abort_op which can confuse users.
4. number of replicas can be changed without complaints from checkpoint as the
local_step is now just a local variable instead of a global vector variable.
This has been tried with manual restarts of workers (chief or non chief) and
ps and seems to be quite robust.
Change: 135513399
Diffstat (limited to 'tensorflow/python/training/sync_replicas_optimizer.py')
-rw-r--r-- | tensorflow/python/training/sync_replicas_optimizer.py | 414 |
1 files changed, 414 insertions, 0 deletions
diff --git a/tensorflow/python/training/sync_replicas_optimizer.py b/tensorflow/python/training/sync_replicas_optimizer.py index ba07cd5908..24d8177f7d 100644 --- a/tensorflow/python/training/sync_replicas_optimizer.py +++ b/tensorflow/python/training/sync_replicas_optimizer.py @@ -31,6 +31,416 @@ from tensorflow.python.training import optimizer from tensorflow.python.training import queue_runner +# Please note that the gradients from replicas are averaged instead of summed +# (as in the old sync_replicas_optimizer) so you need to increase the learning +# rate according to the number of replicas. This change is introduced to be +# consistent with how gradients are aggregated (averaged) within a batch in a +# replica. +class SyncReplicasOptimizerV2(optimizer.Optimizer): + """Class to synchronize, aggregate gradients and pass them to the optimizer. + + In a typical asynchronous training environment, it's common to have some + stale gradients. For example, with a N-replica asynchronous training, + gradients will be applied to the variables N times independently. Depending + on each replica's training speed, some gradients might be calculated from + copies of the variable from several steps back (N-1 steps on average). This + optimizer avoids stale gradients by collecting gradients from all replicas, + averaging them, then applying them to the variables in one shot, after + which replicas can fetch the new variables and continue. + + The following accumulators/queue are created: + <empty line> + * N `gradient accumulators`, one per variable to train. Gradients are pushed + to them and the chief worker will wait until enough gradients are collected + and then average them before applying to variables. The accumulator will + drop all stale gradients (more details in the accumulator op). + * 1 `token` queue where the optimizer pushes the new global_step value after + all variables are updated. + + The following local variable is created: + * `sync_rep_local_step`, one per replica. Compared against the global_step in + each accumulator to check for staleness of the gradients. + + The optimizer adds nodes to the graph to collect gradients and pause the + trainers until variables are updated. + For the Parameter Server job: + <empty line> + 1. An accumulator is created for each variable, and each replica pushes the + gradients into the accumulators instead of directly applying them to the + variables. + 2. Each accumulator averages once enough gradients (replicas_to_aggregate) + have been accumulated. + 3. Apply the averaged gradients to the variables. + 4. Only after all variables have been updated, increment the global step. + 5. Only after step 4, pushes `global_step` in the `token_queue`, once for + each worker replica. The workers can now fetch the global step, use it to + update its local_step variable and start the next batch. + + For the replicas: + <empty line> + 1. Start a step: fetch variables and compute gradients. + 2. Once the gradients have been computed, push them into gradient + accumulators. Each accumulator will check the staleness and drop the stale. + 3. After pushing all the gradients, dequeue an updated value of global_step + from the token queue and record that step to its local_step variable. Note + that this is effectively a barrier. + 4. Start the next batch. + + ### Usage + + ```python + # Create any optimizer to update the variables, say a simple SGD: + opt = GradientDescentOptimizer(learning_rate=0.1) + + # Wrap the optimizer with sync_replicas_optimizer with 50 replicas: at each + # step the optimizer collects 50 gradients before applying to variables. + # Note that if you want to have 2 backup replicas, you can change + # total_num_replicas=52 and make sure this number matches how many physical + # replicas you started in your job. + opt = tf.SyncReplicasOptimizerV2(opt, replicas_to_aggregate=50, + total_num_replicas=50) + + # Some models have startup_delays to help stabilize the model but when using + # sync_replicas training, set it to 0. + + # Now you can call `minimize()` or `compute_gradients()` and + # `apply_gradients()` normally + grads = opt.minimize(total_loss, global_step=self.global_step) + + + # You can now call get_init_tokens_op() and get_chief_queue_runner(). + # Note that get_init_tokens_op() must be called before creating session + # because it modifies the graph by adding new nodes. + init_token_op = opt.get_init_tokens_op() + chief_queue_runner = opt.get_chief_queue_runner() + ``` + + In the training program, every worker will run the train_op as if not + synchronized. But one worker (usually the chief) will need to execute the + chief_queue_runner and get_init_tokens_op from this optimizer. + + ```python + # When you create the supervisor, you need to add the local_init_op and + # ready_for_local_init_op to make sure the local_step is initialized to the + # global_step. Here is an example: + sv = tf.Supervisor(graph=g, + is_chief=is_chief, + # This initialize local step. + local_init_op=local_init_op, + # This makes sure global step is initialized before using. + ready_for_local_init_op=ready_for_local_init_op, + saver=model.saver) + + # After the session is created by the Supervisor and before the main while + # loop: + if is_chief and FLAGS.sync_replicas: + sv.start_queue_runners(sess, [chief_queue_runner]) + # Insert initial tokens to the queue. + sess.run(init_token_op) + ``` + + @@__init__ + @@compute_gradients + @@apply_gradients + @@get_chief_queue_runner + @@get_init_tokens_op + """ + + def __init__(self, + opt, + replicas_to_aggregate, + total_num_replicas=None, + variable_averages=None, + variables_to_average=None, + use_locking=False, + name="sync_replicas"): + """Construct a sync_replicas optimizer. + + Args: + opt: The actual optimizer that will be used to compute and apply the + gradients. Must be one of the Optimizer classes. + replicas_to_aggregate: number of replicas to aggregate for each variable + update. + total_num_replicas: Total number of tasks/workers/replicas, could be + different from replicas_to_aggregate. + If total_num_replicas > replicas_to_aggregate: it is backup_replicas + + replicas_to_aggregate. + If total_num_replicas < replicas_to_aggregate: Replicas compute + multiple batches per update to variables. + variable_averages: Optional `ExponentialMovingAverage` object, used to + maintain moving averages for the variables passed in + `variables_to_average`. + variables_to_average: a list of variables that need to be averaged. Only + needed if variable_averages is passed in. + use_locking: If True use locks for update operation. + name: string. Optional name of the returned operation. + """ + if total_num_replicas is None: + total_num_replicas = replicas_to_aggregate + + super(SyncReplicasOptimizerV2, self).__init__(use_locking, name) + logging.info( + "SyncReplicasV2: replicas_to_aggregate=%s; total_num_replicas=%s", + replicas_to_aggregate, total_num_replicas) + self._opt = opt + self._replicas_to_aggregate = replicas_to_aggregate + self._gradients_applied = False + self._variable_averages = variable_averages + self._variables_to_average = variables_to_average + self._total_num_replicas = total_num_replicas + self._tokens_per_step = max(total_num_replicas, replicas_to_aggregate) + self._global_step = None + self._sync_token_queue = None + + # The synchronization op will be executed in a queue runner which should + # only be executed by one of the replicas (usually the chief). + self._chief_queue_runner = None + + # Remember which accumulator is on which device to set the initial step in + # the accumulator to be global step. This list contains list of the + # following format: (accumulator, device). + self._accumulator_list = [] + + def compute_gradients(self, *args, **kwargs): + """Compute gradients of "loss" for the variables in "var_list". + + This simply wraps the compute_gradients() from the real optimizer. The + gradients will be aggregated in the apply_gradients() so that user can + modify the gradients like clipping with per replica global norm if needed. + The global norm with aggregated gradients can be bad as one replica's huge + gradients can hurt the gradients from other replicas. + + Args: + *args: Arguments for compute_gradients(). + **kwargs: Keyword arguments for compute_gradients(). + + Returns: + A list of (gradient, variable) pairs. + """ + return self._opt.compute_gradients(*args, **kwargs) + + def apply_gradients(self, grads_and_vars, global_step=None, name=None): + """Apply gradients to variables. + + This contains most of the synchronization implementation and also wraps the + apply_gradients() from the real optimizer. + + Args: + grads_and_vars: List of (gradient, variable) pairs as returned by + compute_gradients(). + global_step: Optional Variable to increment by one after the + variables have been updated. + name: Optional name for the returned operation. Default to the + name passed to the Optimizer constructor. + + Returns: + train_op: The op to dequeue a token so the replicas can exit this batch + and start the next one. This is executed by each replica. + + Raises: + ValueError: If the grads_and_vars is empty. + ValueError: If global step is not provided, the staleness cannot be + checked. + """ + if not grads_and_vars: + raise ValueError("Must supply at least one variable") + + if global_step is None: + raise ValueError("Global step is required to check staleness") + + self._global_step = global_step + train_ops = [] + aggregated_grad = [] + var_list = [] + + self._local_step = variables.Variable( + initial_value=0, + trainable=False, + collections=[ops.GraphKeys.LOCAL_VARIABLES], + name="sync_rep_local_step") + self.local_step_init_op = state_ops.assign(self._local_step, global_step) + chief_init_ops = [self.local_step_init_op] + self.ready_for_local_init_op = variables.report_uninitialized_variables( + variables.all_variables()) + + with ops.name_scope(None, self._name): + for grad, var in grads_and_vars: + var_list.append(var) + with ops.device(var.device): + # Dense gradients. + if grad is None: + aggregated_grad.append(None) # pass-through. + continue + elif isinstance(grad, ops.Tensor): + grad_accum = data_flow_ops.ConditionalAccumulator( + grad.dtype, + shape=var.get_shape(), + shared_name=var.name + "/grad_accum") + train_ops.append(grad_accum.apply_grad( + grad, local_step=self._local_step)) + aggregated_grad.append(grad_accum.take_grad( + self._replicas_to_aggregate)) + else: + if not isinstance(grad, ops.IndexedSlices): + raise ValueError("Unknown grad type!") + grad_accum = data_flow_ops.SparseConditionalAccumulator( + grad.dtype, shape=(), shared_name=var.name + "/grad_accum") + train_ops.append(grad_accum.apply_indexed_slices_grad( + grad, local_step=self._local_step)) + aggregated_grad.append(grad_accum.take_indexed_slices_grad( + self._replicas_to_aggregate)) + + self._accumulator_list.append((grad_accum, var.device)) + + aggregated_grads_and_vars = zip(aggregated_grad, var_list) + + # sync_op will be assigned to the same device as the global step. + with ops.device(global_step.device), ops.name_scope(""): + update_op = self._opt.apply_gradients(aggregated_grads_and_vars, + global_step) + + # Create token queue. + with ops.device(global_step.device), ops.name_scope(""): + sync_token_queue = ( + data_flow_ops.FIFOQueue(-1, + global_step.dtype.base_dtype, + shapes=(), + shared_name="sync_token_q")) + self._sync_token_queue = sync_token_queue + + # dummy_queue is passed to the queue runner. Don't use the real queues + # because the queue runner doesn't automatically reopen it once it + # closed queues in PS devices. + dummy_queue = ( + data_flow_ops.FIFOQueue(1, + types_pb2.DT_INT32, + shapes=(), + shared_name="dummy_queue")) + + with ops.device(global_step.device), ops.name_scope(""): + # Replicas have to wait until they can get a token from the token queue. + with ops.control_dependencies(train_ops): + token = sync_token_queue.dequeue() + train_op = state_ops.assign(self._local_step, token) + + with ops.control_dependencies([update_op]): + # Sync_op needs to insert tokens to the token queue at the end of the + # step so the replicas can fetch them to start the next step. + tokens = array_ops.fill([self._tokens_per_step], global_step.ref()) + sync_op = sync_token_queue.enqueue_many((tokens,)) + + if self._variable_averages is not None: + with ops.control_dependencies([sync_op]), ops.name_scope(""): + sync_op = self._variable_averages.apply( + self._variables_to_average) + + self._chief_queue_runner = queue_runner.QueueRunner(dummy_queue, + [sync_op]) + for accum, dev in self._accumulator_list: + with ops.device(dev): + chief_init_ops.append( + accum.set_global_step( + global_step, name="SetGlobalStep")) + self.chief_init_op = control_flow_ops.group(*(chief_init_ops)) + self._gradients_applied = True + return train_op + + def get_chief_queue_runner(self): + """Returns the QueueRunner for the chief to execute. + + This includes the operations to synchronize replicas: aggregate gradients, + apply to variables, increment global step, insert tokens to token queue. + + Note that this can only be called after calling apply_gradients() which + actually generates this queuerunner. + + Returns: + A `QueueRunner` for chief to execute. + + Raises: + ValueError: If this is called before apply_gradients(). + """ + if self._gradients_applied is False: + raise ValueError("Should be called after apply_gradients().") + + return self._chief_queue_runner + + def get_slot(self, *args, **kwargs): + """Return a slot named "name" created for "var" by the Optimizer. + + This simply wraps the get_slot() from the actual optimizer. + + Args: + *args: Arguments for get_slot(). + **kwargs: Keyword arguments for get_slot(). + + Returns: + The `Variable` for the slot if it was created, `None` otherwise. + """ + return self._opt.get_slot(*args, **kwargs) + + def get_slot_names(self, *args, **kwargs): + """Return a list of the names of slots created by the `Optimizer`. + + This simply wraps the get_slot_names() from the actual optimizer. + + Args: + *args: Arguments for get_slot(). + **kwargs: Keyword arguments for get_slot(). + + Returns: + A list of strings. + """ + return self._opt.get_slot_names(*args, **kwargs) + + def get_init_tokens_op(self, num_tokens=-1): + """Returns the op to fill the sync_token_queue with the tokens. + + This is supposed to be executed in the beginning of the chief/sync thread + so that even if the total_num_replicas is less than replicas_to_aggregate, + the model can still proceed as the replicas can compute multiple steps per + variable update. Make sure: + `num_tokens >= replicas_to_aggregate - total_num_replicas`. + + Args: + num_tokens: Number of tokens to add to the queue. + + Returns: + An op for the chief/sync replica to fill the token queue. + + Raises: + ValueError: If this is called before apply_gradients(). + ValueError: If num_tokens are smaller than replicas_to_aggregate - + total_num_replicas. + """ + if self._gradients_applied is False: + raise ValueError( + "get_init_tokens_op() should be called after apply_gradients().") + + tokens_needed = self._replicas_to_aggregate - self._total_num_replicas + if num_tokens == -1: + num_tokens = self._replicas_to_aggregate + elif num_tokens < tokens_needed: + raise ValueError( + "Too few tokens to finish the first step: %d (given) vs %d (needed)" % + (num_tokens, tokens_needed)) + + if num_tokens > 0: + with ops.device(self._global_step.device), ops.name_scope(""): + tokens = array_ops.fill([num_tokens], + self._global_step.ref()) + init_tokens = self._sync_token_queue.enqueue_many((tokens,)) + else: + init_tokens = control_flow_ops.no_op(name="no_init_tokens") + + return init_tokens + + +# Please switch to v2 if you are still using the old sync optimizer. V2 +# is much more efficient and stable. It also removed 100% of the stale +# gradients which is not possible in this implementation without significant +# overhead. This is kept here just for backward compatibility and will be +# DEPRECATED later. class SyncReplicasOptimizer(optimizer.Optimizer): """Class to synchronize, aggregate gradients and pass them to the optimizer. @@ -170,6 +580,10 @@ class SyncReplicasOptimizer(optimizer.Optimizer): total_num_replicas = replicas_to_aggregate super(SyncReplicasOptimizer, self).__init__(use_locking, name) + logging.info("""TO BE DEPRECATED!!! + This version will be deprecated. Please switch to V2 at your + earliest convenience.""") + logging.info( "SyncReplicas enabled: replicas_to_aggregate=%s; total_num_replicas=%s", replicas_to_aggregate, total_num_replicas) |