aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/distribute/python/mirrored_strategy.py
diff options
context:
space:
mode:
authorGravatar Yuefeng Zhou <yuefengz@google.com>2018-08-22 16:15:37 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-08-22 16:34:16 -0700
commit8c6623137c928e20cd2b54471b06582fa118ad9a (patch)
treeb97806d292aea9be9b8651005cdc231f7bc92ef0 /tensorflow/contrib/distribute/python/mirrored_strategy.py
parent05dfd7079d8e3d4487effc6848e4f323eca9ba37 (diff)
Implemented the configure method and properties needed by distribute coordinator in MirroredStrategy.
PiperOrigin-RevId: 209848375
Diffstat (limited to 'tensorflow/contrib/distribute/python/mirrored_strategy.py')
-rw-r--r--tensorflow/contrib/distribute/python/mirrored_strategy.py147
1 files changed, 87 insertions, 60 deletions
diff --git a/tensorflow/contrib/distribute/python/mirrored_strategy.py b/tensorflow/contrib/distribute/python/mirrored_strategy.py
index 6981449a4c..ecaf60f350 100644
--- a/tensorflow/contrib/distribute/python/mirrored_strategy.py
+++ b/tensorflow/contrib/distribute/python/mirrored_strategy.py
@@ -25,8 +25,8 @@ import threading
from tensorflow.contrib.distribute.python import cross_tower_ops as cross_tower_ops_lib
from tensorflow.contrib.distribute.python import shared_variable_creator
from tensorflow.contrib.distribute.python import values
-from tensorflow.core.protobuf import cluster_pb2
from tensorflow.python import pywrap_tensorflow
+from tensorflow.python.distribute import multi_worker_util
from tensorflow.python.eager import context
from tensorflow.python.eager import tape
from tensorflow.python.framework import constant_op
@@ -39,7 +39,6 @@ from tensorflow.python.ops import variables as variables_lib
from tensorflow.python.training import coordinator
from tensorflow.python.training import device_util
from tensorflow.python.training import distribute as distribute_lib
-from tensorflow.python.training import server_lib
from tensorflow.python.util import nest
@@ -299,8 +298,10 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
This strategy uses one tower per device and sync replication for its multi-GPU
version.
- When `cluster_spec` is given, it turns into the mulit-worker version that
- works on multiple workers with in-graph replication.
+ When `cluster_spec` is given by the `configure` method., it turns into the
+ mulit-worker version that works on multiple workers with in-graph replication.
+ Note: `configure` will be called by higher-level APIs if running in
+ distributed environment.
There are several important concepts for distributed TensorFlow, e.g.
`client`, `job`, 'task', `cluster`, `in-graph replication` and
@@ -330,8 +331,6 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
num_gpus: number of GPUs. For local training, either specify `devices` or
`num_gpus`. In distributed training, this must be specified as number of
GPUs on each worker.
- cluster_spec: if this is set, it turns into the multi-worker version and
- `devices` must not be set but `num_gpus` must be set.
cross_tower_ops: optional, a descedant of `CrossTowerOps`. If this is not
set, the `configure` method will try to find the best one.
prefetch_on_device: optional boolean to specify whether to prefetch input
@@ -341,65 +340,76 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
def __init__(self,
devices=None,
num_gpus=None,
- cluster_spec=None,
cross_tower_ops=None,
prefetch_on_device=None):
super(MirroredStrategy, self).__init__()
- if cluster_spec:
- if devices is not None:
- raise ValueError("Specifying devices when `cluster_spec` is also given "
- "is not supported in MirroredStrategy.")
-
- # TODO(yuefengz): use the utility method to normalize cluster_spec.
- if isinstance(cluster_spec, (dict, cluster_pb2.ClusterDef)):
- cluster_spec = server_lib.ClusterSpec(cluster_spec)
- elif not isinstance(cluster_spec, server_lib.ClusterSpec):
- raise ValueError(
- "`cluster_spec' should be dict or a `tf.train.ClusterSpec` or a "
- "`tf.train.ClusterDef` object")
- self._cluster_spec = cluster_spec
-
- self._workers = []
- for job in sorted(cluster_spec.jobs):
- for task in range(cluster_spec.num_tasks(job)):
- self._workers.append("/job:%s/task:%d" % (job, task))
+ self._cross_tower_ops = cross_tower_ops
+ self._prefetch_on_device = prefetch_on_device
+ # Rememeber num GPUs which might be needed by `configure` method.
+ self._num_gpus = num_gpus
+
+ self._initialize_local(num_gpus, devices)
+ def _initialize_local(self, num_gpus, devices):
+ """Initializes the object for local training."""
+ self._cluster_spec = None
+ # Convert `num_gpus` into `devices`, shouldn't specify both.
+ if devices is None:
if num_gpus is None:
- raise ValueError("`num_gpus` is required if `cluster_spec` is given.")
- self._num_gpus = num_gpus
- if num_gpus > 0:
- self._worker_device_map = {
- worker: [
- device_util.canonicalize(worker + "/device:GPU:%d" % gpu)
- for gpu in range(num_gpus)
- ] for worker in self._workers
- }
+ num_gpus = context.num_gpus()
+ if num_gpus == 0:
+ devices = ["/device:CPU:0"]
else:
- self._worker_device_map = {
- worker: [device_util.canonicalize(worker, "/device:CPU:0")]
- for worker in self._workers
- }
- devices = nest.flatten(self._worker_device_map)
-
- # Setting `_default_device` will add a device scope in the
- # distribution.scope. We set the default device to the first worker. When
- # users specify device under distribution.scope by
- # with tf.device("/cpu:0"):
- # ...
- # their ops will end up on the cpu device of its first worker, e.g.
- # "/job:worker/task:0/device:CPU:0". Note this is not used in tower mode.
- self._default_device = self._workers[0]
- else:
- self._cluster_spec = None
- # Convert `num_gpus` into `devices`, shouldn't specify both.
- if devices is None:
- if num_gpus is None:
- num_gpus = context.num_gpus()
devices = ["/device:GPU:%d" % d for d in range(num_gpus)]
- elif num_gpus is not None:
- raise ValueError("Must only specify one of `devices` and `num_gpus`.")
- # TODO(yuefengz): consider setting the default device.
+ elif num_gpus is not None:
+ raise ValueError("Must only specify one of `devices` and `num_gpus`.")
+ self._num_gpus = num_gpus
+ # TODO(yuefengz): consider setting the default device.
+
+ assert devices, "Must specify at least one device."
+ assert len(set(devices)) == len(devices), (
+ "No duplicates allowed in `devices` argument.")
+ # TODO(josh11b): Require at least 2 devices?
+ self._devices = [device_util.resolve(d) for d in devices]
+ self._canonical_device_set = set(self._devices)
+ self._device_index = values.PerDevice({d: i for i, d in enumerate(devices)})
+
+ def _initialize_multi_worker(self, num_gpus, cluster_spec):
+ """Initializes the object for multi-worker training."""
+ cluster_spec = multi_worker_util.normalize_cluster_spec(cluster_spec)
+ self._cluster_spec = cluster_spec
+
+ self._workers = []
+ for job in ["chief", "worker"]:
+ for task in range(len(cluster_spec.as_dict().get(job, []))):
+ self._workers.append("/job:%s/task:%d" % (job, task))
+
+ if num_gpus is None:
+ raise ValueError("`num_gpus` is required if `cluster_spec` is given.")
+ if num_gpus > 0:
+ self._worker_device_map = {
+ worker: [
+ device_util.canonicalize(worker + "/device:GPU:%d" % gpu)
+ for gpu in range(num_gpus)
+ ] for worker in self._workers
+ }
+ else:
+ self._worker_device_map = {
+ worker: [device_util.canonicalize(worker, "/device:CPU:0")]
+ for worker in self._workers
+ }
+
+ devices = nest.flatten(self._worker_device_map)
+
+ # Setting `_default_device` will add a device scope in the
+ # distribution.scope. We set the default device to the first worker. When
+ # users specify device under distribution.scope by
+ # with tf.device("/cpu:0"):
+ # ...
+ # their ops will end up on the cpu device of its first worker, e.g.
+ # "/job:worker/task:0/device:CPU:0". Note this is not used in tower mode.
+ self._default_device = self._workers[0]
assert devices, "Must specify at least one device."
assert len(set(devices)) == len(devices), (
@@ -409,8 +419,6 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
self._canonical_device_set = set(self._devices)
self._device_index = values.PerDevice(
{d: i for i, d in enumerate(devices)})
- self._cross_tower_ops = cross_tower_ops
- self._prefetch_on_device = prefetch_on_device
def _create_variable(self, next_creator, *args, **kwargs):
"""Create a mirrored variable. See `DistributionStrategy.scope`."""
@@ -544,7 +552,10 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
cluster_spec=None,
task_type=None,
task_id=None):
- del cluster_spec, task_type, task_id
+ del task_type, task_id
+ if cluster_spec:
+ self._initialize_multi_worker(self._num_gpus, cluster_spec)
+
if self._cross_tower_ops is None:
if self._cluster_spec:
self._cross_tower_ops = cross_tower_ops_lib.MultiWorkerAllReduce(
@@ -636,6 +647,22 @@ class MirroredStrategy(distribute_lib.DistributionStrategy):
def parameter_devices(self):
return list(self._devices)
+ @property
+ def between_graph(self):
+ return False
+
+ @property
+ def should_init(self):
+ return True
+
+ @property
+ def should_checkpoint(self):
+ return True
+
+ @property
+ def should_save_summary(self):
+ return True
+
def non_slot_devices(self, var_list):
del var_list
return list(self._devices)