diff options
author | Yuefeng Zhou <yuefengz@google.com> | 2018-08-22 16:15:37 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-08-22 16:34:16 -0700 |
commit | 8c6623137c928e20cd2b54471b06582fa118ad9a (patch) | |
tree | b97806d292aea9be9b8651005cdc231f7bc92ef0 /tensorflow/contrib/distribute/python/mirrored_strategy.py | |
parent | 05dfd7079d8e3d4487effc6848e4f323eca9ba37 (diff) |
Implemented the configure method and properties needed by distribute coordinator in MirroredStrategy.
PiperOrigin-RevId: 209848375
Diffstat (limited to 'tensorflow/contrib/distribute/python/mirrored_strategy.py')
-rw-r--r-- | tensorflow/contrib/distribute/python/mirrored_strategy.py | 147 |
1 files changed, 87 insertions, 60 deletions
diff --git a/tensorflow/contrib/distribute/python/mirrored_strategy.py b/tensorflow/contrib/distribute/python/mirrored_strategy.py index 6981449a4c..ecaf60f350 100644 --- a/tensorflow/contrib/distribute/python/mirrored_strategy.py +++ b/tensorflow/contrib/distribute/python/mirrored_strategy.py @@ -25,8 +25,8 @@ import threading from tensorflow.contrib.distribute.python import cross_tower_ops as cross_tower_ops_lib from tensorflow.contrib.distribute.python import shared_variable_creator from tensorflow.contrib.distribute.python import values -from tensorflow.core.protobuf import cluster_pb2 from tensorflow.python import pywrap_tensorflow +from tensorflow.python.distribute import multi_worker_util from tensorflow.python.eager import context from tensorflow.python.eager import tape from tensorflow.python.framework import constant_op @@ -39,7 +39,6 @@ from tensorflow.python.ops import variables as variables_lib from tensorflow.python.training import coordinator from tensorflow.python.training import device_util from tensorflow.python.training import distribute as distribute_lib -from tensorflow.python.training import server_lib from tensorflow.python.util import nest @@ -299,8 +298,10 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): This strategy uses one tower per device and sync replication for its multi-GPU version. - When `cluster_spec` is given, it turns into the mulit-worker version that - works on multiple workers with in-graph replication. + When `cluster_spec` is given by the `configure` method., it turns into the + mulit-worker version that works on multiple workers with in-graph replication. + Note: `configure` will be called by higher-level APIs if running in + distributed environment. There are several important concepts for distributed TensorFlow, e.g. `client`, `job`, 'task', `cluster`, `in-graph replication` and @@ -330,8 +331,6 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): num_gpus: number of GPUs. For local training, either specify `devices` or `num_gpus`. In distributed training, this must be specified as number of GPUs on each worker. - cluster_spec: if this is set, it turns into the multi-worker version and - `devices` must not be set but `num_gpus` must be set. cross_tower_ops: optional, a descedant of `CrossTowerOps`. If this is not set, the `configure` method will try to find the best one. prefetch_on_device: optional boolean to specify whether to prefetch input @@ -341,65 +340,76 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): def __init__(self, devices=None, num_gpus=None, - cluster_spec=None, cross_tower_ops=None, prefetch_on_device=None): super(MirroredStrategy, self).__init__() - if cluster_spec: - if devices is not None: - raise ValueError("Specifying devices when `cluster_spec` is also given " - "is not supported in MirroredStrategy.") - - # TODO(yuefengz): use the utility method to normalize cluster_spec. - if isinstance(cluster_spec, (dict, cluster_pb2.ClusterDef)): - cluster_spec = server_lib.ClusterSpec(cluster_spec) - elif not isinstance(cluster_spec, server_lib.ClusterSpec): - raise ValueError( - "`cluster_spec' should be dict or a `tf.train.ClusterSpec` or a " - "`tf.train.ClusterDef` object") - self._cluster_spec = cluster_spec - - self._workers = [] - for job in sorted(cluster_spec.jobs): - for task in range(cluster_spec.num_tasks(job)): - self._workers.append("/job:%s/task:%d" % (job, task)) + self._cross_tower_ops = cross_tower_ops + self._prefetch_on_device = prefetch_on_device + # Rememeber num GPUs which might be needed by `configure` method. + self._num_gpus = num_gpus + + self._initialize_local(num_gpus, devices) + def _initialize_local(self, num_gpus, devices): + """Initializes the object for local training.""" + self._cluster_spec = None + # Convert `num_gpus` into `devices`, shouldn't specify both. + if devices is None: if num_gpus is None: - raise ValueError("`num_gpus` is required if `cluster_spec` is given.") - self._num_gpus = num_gpus - if num_gpus > 0: - self._worker_device_map = { - worker: [ - device_util.canonicalize(worker + "/device:GPU:%d" % gpu) - for gpu in range(num_gpus) - ] for worker in self._workers - } + num_gpus = context.num_gpus() + if num_gpus == 0: + devices = ["/device:CPU:0"] else: - self._worker_device_map = { - worker: [device_util.canonicalize(worker, "/device:CPU:0")] - for worker in self._workers - } - devices = nest.flatten(self._worker_device_map) - - # Setting `_default_device` will add a device scope in the - # distribution.scope. We set the default device to the first worker. When - # users specify device under distribution.scope by - # with tf.device("/cpu:0"): - # ... - # their ops will end up on the cpu device of its first worker, e.g. - # "/job:worker/task:0/device:CPU:0". Note this is not used in tower mode. - self._default_device = self._workers[0] - else: - self._cluster_spec = None - # Convert `num_gpus` into `devices`, shouldn't specify both. - if devices is None: - if num_gpus is None: - num_gpus = context.num_gpus() devices = ["/device:GPU:%d" % d for d in range(num_gpus)] - elif num_gpus is not None: - raise ValueError("Must only specify one of `devices` and `num_gpus`.") - # TODO(yuefengz): consider setting the default device. + elif num_gpus is not None: + raise ValueError("Must only specify one of `devices` and `num_gpus`.") + self._num_gpus = num_gpus + # TODO(yuefengz): consider setting the default device. + + assert devices, "Must specify at least one device." + assert len(set(devices)) == len(devices), ( + "No duplicates allowed in `devices` argument.") + # TODO(josh11b): Require at least 2 devices? + self._devices = [device_util.resolve(d) for d in devices] + self._canonical_device_set = set(self._devices) + self._device_index = values.PerDevice({d: i for i, d in enumerate(devices)}) + + def _initialize_multi_worker(self, num_gpus, cluster_spec): + """Initializes the object for multi-worker training.""" + cluster_spec = multi_worker_util.normalize_cluster_spec(cluster_spec) + self._cluster_spec = cluster_spec + + self._workers = [] + for job in ["chief", "worker"]: + for task in range(len(cluster_spec.as_dict().get(job, []))): + self._workers.append("/job:%s/task:%d" % (job, task)) + + if num_gpus is None: + raise ValueError("`num_gpus` is required if `cluster_spec` is given.") + if num_gpus > 0: + self._worker_device_map = { + worker: [ + device_util.canonicalize(worker + "/device:GPU:%d" % gpu) + for gpu in range(num_gpus) + ] for worker in self._workers + } + else: + self._worker_device_map = { + worker: [device_util.canonicalize(worker, "/device:CPU:0")] + for worker in self._workers + } + + devices = nest.flatten(self._worker_device_map) + + # Setting `_default_device` will add a device scope in the + # distribution.scope. We set the default device to the first worker. When + # users specify device under distribution.scope by + # with tf.device("/cpu:0"): + # ... + # their ops will end up on the cpu device of its first worker, e.g. + # "/job:worker/task:0/device:CPU:0". Note this is not used in tower mode. + self._default_device = self._workers[0] assert devices, "Must specify at least one device." assert len(set(devices)) == len(devices), ( @@ -409,8 +419,6 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): self._canonical_device_set = set(self._devices) self._device_index = values.PerDevice( {d: i for i, d in enumerate(devices)}) - self._cross_tower_ops = cross_tower_ops - self._prefetch_on_device = prefetch_on_device def _create_variable(self, next_creator, *args, **kwargs): """Create a mirrored variable. See `DistributionStrategy.scope`.""" @@ -544,7 +552,10 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): cluster_spec=None, task_type=None, task_id=None): - del cluster_spec, task_type, task_id + del task_type, task_id + if cluster_spec: + self._initialize_multi_worker(self._num_gpus, cluster_spec) + if self._cross_tower_ops is None: if self._cluster_spec: self._cross_tower_ops = cross_tower_ops_lib.MultiWorkerAllReduce( @@ -636,6 +647,22 @@ class MirroredStrategy(distribute_lib.DistributionStrategy): def parameter_devices(self): return list(self._devices) + @property + def between_graph(self): + return False + + @property + def should_init(self): + return True + + @property + def should_checkpoint(self): + return True + + @property + def should_save_summary(self): + return True + def non_slot_devices(self, var_list): del var_list return list(self._devices) |