aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py')
-rw-r--r--tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py b/tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py
new file mode 100644
index 0000000000..5a5e67ef68
--- /dev/null
+++ b/tensorflow/contrib/opt/python/training/delay_compensated_gradient_descent.py
@@ -0,0 +1,256 @@
+# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+
+"""DelayCompensatedGradientDescentOptimizer for TensorFlow."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from tensorflow.python.framework import ops
+from tensorflow.python.ops import array_ops
+from tensorflow.python.ops import math_ops
+from tensorflow.python.ops import state_ops
+from tensorflow.python.ops import variables
+from tensorflow.python.training import optimizer
+from tensorflow.python.training import training_ops
+
+
+class _RefVariableAsynchronousProcessor(optimizer._RefVariableProcessor):
+ """Processor for Variable."""
+ def update_op_asynchronous(self, optimizer, g, index):
+ if isinstance(g, ops.Tensor):
+ return optimizer._apply_dense(g, self._v, index)
+ else:
+ assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a "
+ "tensor nor IndexedSlices.")
+ # pylint: disable=protected-access
+ return optimizer._apply_sparse_duplicate_indices(g, self._v, index)
+
+
+class _DenseResourceVariableAsynchronousProcessor(optimizer._DenseResourceVariableProcessor):
+ """Processor for dense ResourceVariables."""
+ def update_op_asynchronous(self, optimizer, g, index):
+ # pylint: disable=protected-access
+ if isinstance(g, ops.IndexedSlices):
+ return optimizer._resource_apply_sparse_duplicate_indices(
+ g.values, self._v, g.indices, index)
+ return optimizer._resource_apply_dense(g, self._v, index)
+
+
+def _get_processor(v):
+ """The processor of v."""
+ if v.op.type == "VarHandleOp":
+ return _DenseResourceVariableAsynchronousProcessor(v)
+ if isinstance(v, variables.Variable):
+ return _RefVariableAsynchronousProcessor(v)
+ raise NotImplementedError("Trying to optimize unsupported type ", v)
+
+
+class DelayCompensatedGradientDescentOptimizer(optimizer.Optimizer):
+ """Optimizer that implements gradient descent with delay compensation.
+
+ See [Zheng, Shuxin, et al., 2016](https://arxiv.org/abs/1609.08326)
+ ([pdf](https://arxiv.org/pdf/1609.08326.pdf)).
+ """
+
+ def __init__(self, learning_rate, variance_parameter, num_workers=1,
+ use_locking=False, name="DelayCompensatedGradientDescent"):
+ """Construct a new gradient descent optimizer with delay compensation.
+
+ Args:
+ learning_rate: A Tensor or a floating point value. The learning
+ rate to use.
+ variance_parameter: A Tensor or a floating point value. The lambda
+ value to use.
+ num_workers: A value to indicate number of workers computing gradients
+ asynchronously.
+ use_locking: If True use locks for update operations.
+ name: Optional name prefix for the operations created when applying
+ gradients. Defaults to "DelayCompensatedGradientDescent".
+ """
+ if num_workers <= 0:
+ raise ValueError("num_workers must be positive: %s" % num_workers)
+ super(DelayCompensatedGradientDescentOptimizer, self).__init__(
+ use_locking, name)
+ self._learning_rate = learning_rate
+ self._lambda = variance_parameter
+ self._num_workers = num_workers
+
+ def minimize(self, loss, global_step=None, var_list=None,
+ gate_gradients=optimizer.Optimizer.GATE_OP, aggregation_method=None,
+ colocate_gradients_with_ops=False, name=None,
+ grad_loss=None, worker_index=None):
+ """Add operations to minimize `loss` by updating `var_list`.
+
+ This method simply combines calls `compute_gradients()` and
+ `apply_gradients()`. If you want to process the gradient before applying
+ them call `compute_gradients()` and `apply_gradients()` explicitly instead
+ of using this function.
+
+ Args:
+ loss: A `Tensor` containing the value to minimize.
+ global_step: Optional `Variable` to increment by one after the
+ variables have been updated.
+ var_list: Optional list or tuple of `Variable` objects to update to
+ minimize `loss`. Defaults to the list of variables collected in
+ the graph under the key `GraphKeys.TRAINABLE_VARIABLES`.
+ gate_gradients: How to gate the computation of gradients. Can be
+ `GATE_NONE`, `GATE_OP`, or `GATE_GRAPH`.
+ aggregation_method: Specifies the method used to combine gradient terms.
+ Valid values are defined in the class `AggregationMethod`.
+ colocate_gradients_with_ops: If True, try colocating gradients with
+ the corresponding op.
+ name: Optional name for the returned operation.
+ grad_loss: Optional. A `Tensor` holding the gradient computed for `loss`.
+ worker_index: Optional. A value to indicate the instance of worker
+ minimizing if computing asynchronously.
+
+ Returns:
+ An Operation that updates the variables in `var_list`. If `global_step`
+ was not `None`, that operation also increments `global_step`.
+
+ Raises:
+ ValueError: If some of the variables are not `Variable` objects.
+ """
+ if (worker_index < 0 and worker_index is not None) or worker_index >= self._num_workers:
+ raise ValueError("worker index must be in the range [0, num_workers): %s" %
+ worker_index)
+ grads_and_vars = self.compute_gradients(
+ loss, var_list=var_list, gate_gradients=gate_gradients,
+ aggregation_method=aggregation_method,
+ colocate_gradients_with_ops=colocate_gradients_with_ops,
+ grad_loss=grad_loss)
+
+ vars_with_grad = [v for g, v in grads_and_vars if g is not None]
+ if not vars_with_grad:
+ raise ValueError(
+ "No gradients provided for any variable, check your graph for ops"
+ " that do not support gradients, between variables %s and loss %s." %
+ ([str(v) for _, v in grads_and_vars], loss))
+
+ return self.apply_gradients(grads_and_vars, global_step=global_step,
+ name=name, worker_index=worker_index)
+
+ def apply_gradients(self,
+ grads_and_vars,
+ global_step=None,
+ name=None,
+ worker_index=None):
+ """Apply gradients to variables.
+
+ This is the second part of `minimize()`. It returns an `Operation` that
+ applies gradients.
+
+ Args:
+ grads_and_vars: List of (gradient, variable) pairs as returned by
+ `compute_gradients()`.
+ global_step: Optional `Variable` to increment by one after the
+ variables have been updated.
+ name: Optional name for the returned operation. Default to the
+ name passed to the `Optimizer` constructor.
+ worker_index: Optional value to indicate the instance of worker
+ minimizing if computing asynchronously.
+
+ Returns:
+ An `Operation` that applies the specified gradients. If `global_step`
+ was not None, that operation also increments `global_step`.
+
+ Raises:
+ TypeError: If `grads_and_vars` is malformed.
+ ValueError: If none of the variables have gradients.
+ """
+ # This is a default implementation of apply_gradients() that can be shared
+ # by most optimizers. It relies on the subclass implementing the following
+ # methods: _create_slots(), _prepare(), _apply_dense(), and _apply_sparse().
+
+ grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works.
+ if not grads_and_vars:
+ raise ValueError("No variables provided.")
+ converted_grads_and_vars = []
+ for g, v in grads_and_vars:
+ if g is not None:
+ try:
+ # Convert the grad to Tensor or IndexedSlices if necessary.
+ g = ops.convert_to_tensor_or_indexed_slices(g)
+ except TypeError:
+ raise TypeError(
+ "Gradient must be convertible to a Tensor"
+ " or IndexedSlices, or None: %s" % g)
+ if not isinstance(g, (ops.Tensor, ops.IndexedSlices)):
+ raise TypeError(
+ "Gradient must be a Tensor, IndexedSlices, or None: %s" % g)
+ p = _get_processor(v)
+ converted_grads_and_vars.append((g, v, p))
+
+ converted_grads_and_vars = tuple(converted_grads_and_vars)
+ var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]
+ if not var_list:
+ raise ValueError("No gradients provided for any variable: %s." %
+ ([str(v) for _, _, v in converted_grads_and_vars],))
+ with ops.control_dependencies(None):
+ self._create_slots([optimizer._get_variable_for(v) for v in var_list])
+ update_ops = []
+ with ops.name_scope(name, self._name) as name:
+ self._prepare()
+ for grad, var, processor in converted_grads_and_vars:
+ if grad is None:
+ continue
+ # We colocate all ops created in _apply_dense or _apply_sparse
+ # on the same device as the variable.
+ with ops.name_scope("update_" + var.op.name), ops.colocate_with(var):
+ if worker_index is None:
+ update_ops.append(processor.update_op(self, grad))
+ else:
+ update_ops.append(processor.update_op_asynchronous(self, grad,
+ worker_index))
+ if global_step is None:
+ apply_updates = self._finish(update_ops, name)
+ else:
+ with ops.control_dependencies([self._finish(update_ops, "update")]):
+ with ops.colocate_with(global_step):
+ apply_updates = state_ops.assign_add(global_step, 1, name=name).op
+
+ train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
+ if apply_updates not in train_op:
+ train_op.append(apply_updates)
+
+ return apply_updates
+
+ def _create_slots(self, var_list):
+ """Initialize slots for all the vars of each worker to store
+ the previous values of it
+ """
+ for index in range(self._num_workers):
+ for v in var_list:
+ var2 = array_ops.identity(v.initialized_value())
+ self._get_or_make_slot(v, var2, "shadow_{0}".format(index),
+ self._name)
+
+ def _resource_apply_dense(self, grad, var, worker_index=0):
+ # Get previous value of the variable from the slot
+ shadow = self.get_slot(var, "shadow_{0}".format(worker_index))
+ return training_ops.apply_delay_compensated_gradient_descent(
+ var.handle,
+ math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype),
+ grad,
+ math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype),
+ shadow.handle,
+ use_locking=self._use_locking)
+
+ def _prepare(self):
+ self._learning_rate_tensor = ops.convert_to_tensor(self._learning_rate,
+ name="learning_rate")
+ self._lambda_tensor = ops.convert_to_tensor(self._lambda,
+ name="lambda")