1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for multi_worker_util."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.core.protobuf import cluster_pb2
from tensorflow.python.distribute import multi_worker_util
from tensorflow.python.eager import test
from tensorflow.python.training import server_lib
class NormalizeClusterSpecTest(test.TestCase):
def assert_same_cluster(self, lhs, rhs):
self.assertEqual(
server_lib.ClusterSpec(lhs).as_dict(),
server_lib.ClusterSpec(rhs).as_dict())
def testDictAsInput(self):
cluster_spec = {
"chief": ["127.0.0.1:1234"],
"worker": ["127.0.0.1:8964", "127.0.0.1:2333"],
"ps": ["127.0.0.1:1926", "127.0.0.1:3141"]
}
self.assert_same_cluster(
cluster_spec, multi_worker_util.normalize_cluster_spec(cluster_spec))
def testClusterDefAsInput(self):
cluster_def = cluster_pb2.ClusterDef()
job = cluster_def.job.add()
job.name = "chief"
job.tasks[0] = "127.0.0.1:1234"
job = cluster_def.job.add()
job.name = "worker"
job.tasks[0] = "127.0.0.1:8964"
job.tasks[1] = "127.0.0.1:2333"
job = cluster_def.job.add()
job.name = "ps"
job.tasks[0] = "127.0.0.1:1926"
job.tasks[1] = "127.0.0.1:3141"
self.assert_same_cluster(
cluster_def, multi_worker_util.normalize_cluster_spec(cluster_def))
def testClusterSpecAsInput(self):
cluster_spec = server_lib.ClusterSpec({
"chief": ["127.0.0.1:1234"],
"worker": ["127.0.0.1:8964", "127.0.0.1:2333"],
"ps": ["127.0.0.1:1926", "127.0.0.1:3141"]
})
self.assert_same_cluster(
cluster_spec, multi_worker_util.normalize_cluster_spec(cluster_spec))
def testUnexpectedInput(self):
cluster_spec = ["127.0.0.1:8964", "127.0.0.1:2333"]
with self.assertRaisesRegexp(
ValueError,
"`cluster_spec' should be dict or a `tf.train.ClusterSpec` or a "
"`tf.train.ClusterDef` object"):
multi_worker_util.normalize_cluster_spec(cluster_spec)
class IsChiefTest(test.TestCase):
def testClusterWithChief(self):
cluster_spec = {
"chief": ["127.0.0.1:1234"],
"worker": ["127.0.0.1:8964", "127.0.0.1:2333"],
"ps": ["127.0.0.1:1926", "127.0.0.1:3141"]
}
self.assertTrue(multi_worker_util.is_chief(cluster_spec, "chief", 0))
self.assertFalse(multi_worker_util.is_chief(cluster_spec, "worker", 0))
def testClusterWithoutChief(self):
cluster_spec = {"worker": ["127.0.0.1:8964", "127.0.0.1:2333"]}
self.assertTrue(multi_worker_util.is_chief(cluster_spec, "worker", 0))
self.assertFalse(multi_worker_util.is_chief(cluster_spec, "worker", 1))
with self.assertRaisesRegexp(
ValueError, "The task_type \"chief\" is not in the `cluster_spec`."):
multi_worker_util.is_chief(cluster_spec, "chief", 0)
with self.assertRaisesRegexp(
ValueError, "The `task_id` 2 exceeds the maximum id of worker."):
multi_worker_util.is_chief(cluster_spec, "worker", 2)
if __name__ == "__main__":
test.main()
|