aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py')
-rw-r--r--tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py315
1 files changed, 277 insertions, 38 deletions
diff --git a/tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py b/tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py
index 8d474124b7..9807ce4351 100644
--- a/tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py
+++ b/tensorflow/contrib/distribute/python/mirrored_strategy_multigpu_test.py
@@ -114,7 +114,10 @@ class MirroredTwoDeviceDistributionTest(strategy_test_lib.DistributionTestBase):
dist = self._get_distribution_strategy()
with dist.scope():
result = dist.call_for_each_tower(run_fn, dist.worker_device_index)
- reduced = dist.reduce("sum", result, destinations="/device:CPU:0")
+ reduced = dist.reduce(
+ variable_scope.VariableAggregation.SUM,
+ result,
+ destinations="/device:CPU:0")
unwrapped = dist.unwrap(reduced)
self.assertEqual(1, len(unwrapped))
expected = sum(range(len(dist.worker_devices)))
@@ -132,8 +135,10 @@ class MirroredTwoDeviceDistributionTest(strategy_test_lib.DistributionTestBase):
dist = mirrored_strategy.MirroredStrategy(devices)
with dist.scope():
- reduced = dist.reduce("sum", 1.0, destinations=["/device:CPU:0",
- "/device:GPU:0"])
+ reduced = dist.reduce(
+ variable_scope.VariableAggregation.SUM,
+ 1.0,
+ destinations=["/device:CPU:0", "/device:GPU:0"])
unwrapped = dist.unwrap(reduced)
self.assertEqual(2, len(unwrapped))
self.assertEqual(1.0, self.evaluate(unwrapped[0]))
@@ -284,18 +289,68 @@ class MirroredStrategyVariableCreationTest(test.TestCase):
self.assertEquals("common/dense" + suffix + "/bias:0", bias.name)
@test_util.run_in_graph_and_eager_modes(config=config)
+ def testWithVariableAndVariableScope(self):
+ self._skip_eager_if_gpus_less_than(1)
+
+ def model_fn():
+ v0 = variable_scope.variable(1.0, name="var0", aggregation=None)
+ with variable_scope.variable_scope("common"):
+ v1 = variable_scope.variable(1.0, name="var1")
+ # This will pause the current thread, and execute the other thread.
+ distribute_lib.get_tower_context().merge_call(lambda _: _)
+ v2 = variable_scope.variable(
+ 1.0,
+ name="var2",
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
+ v3 = variable_scope.variable(
+ 1.0,
+ name="var3",
+ synchronization=variable_scope.VariableSynchronization.ON_WRITE,
+ aggregation=variable_scope.VariableAggregation.MEAN)
+
+ return v0, v1, v2, v3
+
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ v = variable_scope.variable(1.0, name="var-main0")
+ self.assertEquals("var-main0:0", v.name)
+
+ result = dist.call_for_each_tower(model_fn, run_concurrently=False)
+ self.assertEquals(4, len(result))
+ v0, v1, v2, v3 = result
+ self.assertIsInstance(v0, values.MirroredVariable)
+ self.assertEquals("var0:0", v0.name)
+ self.assertIsInstance(v1, values.MirroredVariable)
+ self.assertEquals("common/var1:0", v1.name)
+ self.assertIsInstance(v2, values.TowerLocalVariable)
+ self.assertEquals("common/var2:0", v2.name)
+ self.assertEquals(variable_scope.VariableAggregation.SUM, v2.aggregation)
+ self.assertIsInstance(v3, values.MirroredVariable)
+ self.assertEquals("common/var3:0", v3.name)
+ self.assertEquals(variable_scope.VariableAggregation.MEAN, v3.aggregation)
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
def testWithGetVariableAndVariableScope(self):
self._skip_eager_if_gpus_less_than(1)
def model_fn():
- v0 = variable_scope.get_variable("var-thread0", [1])
+ v0 = variable_scope.get_variable("var0", [1])
with variable_scope.variable_scope("common"):
- v1 = variable_scope.get_variable("var-thread1", [1])
+ v1 = variable_scope.get_variable("var1", [1])
# This will pause the current thread, and execute the other thread.
distribute_lib.get_tower_context().merge_call(lambda _: _)
- v2 = variable_scope.get_variable("var-thread2", [1])
+ v2 = variable_scope.get_variable(
+ "var2", [1],
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
+ v3 = variable_scope.get_variable(
+ "var3", [1],
+ synchronization=variable_scope.VariableSynchronization.ON_WRITE,
+ aggregation=variable_scope.VariableAggregation.MEAN)
- return v0, v1, v2
+ return v0, v1, v2, v3
devices = ["/device:CPU:0", "/device:GPU:0"]
dist = mirrored_strategy.MirroredStrategy(devices)
@@ -305,14 +360,89 @@ class MirroredStrategyVariableCreationTest(test.TestCase):
self.assertEquals("main/var-main0:0", v.name)
result = dist.call_for_each_tower(model_fn, run_concurrently=False)
- self.assertEquals(3, len(result))
- v0, v1, v2 = result
+ self.assertEquals(4, len(result))
+ v0, v1, v2, v3 = result
self.assertIsInstance(v0, values.MirroredVariable)
- self.assertEquals("main/var-thread0:0", v0.name)
+ self.assertEquals("main/var0:0", v0.name)
self.assertIsInstance(v1, values.MirroredVariable)
- self.assertEquals("main/common/var-thread1:0", v1.name)
- self.assertIsInstance(v2, values.MirroredVariable)
- self.assertEquals("main/common/var-thread2:0", v2.name)
+ self.assertEquals("main/common/var1:0", v1.name)
+ self.assertIsInstance(v2, values.TowerLocalVariable)
+ self.assertEquals("main/common/var2:0", v2.name)
+ self.assertEquals(variable_scope.VariableAggregation.SUM,
+ v2.aggregation)
+ self.assertIsInstance(v3, values.MirroredVariable)
+ self.assertEquals("main/common/var3:0", v3.name)
+ self.assertEquals(variable_scope.VariableAggregation.MEAN,
+ v3.aggregation)
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testNoneSynchronizationWithGetVariable(self):
+ self._skip_eager_if_gpus_less_than(1)
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ with self.assertRaisesRegexp(
+ ValueError, "`NONE` variable synchronization mode is not "
+ "supported with `Mirrored` distribution strategy. Please change "
+ "the `synchronization` for variable: v"):
+ variable_scope.get_variable(
+ "v", [1],
+ synchronization=variable_scope.VariableSynchronization.NONE)
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testNoneSynchronizationWithVariable(self):
+ self._skip_eager_if_gpus_less_than(1)
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ with self.assertRaisesRegexp(
+ ValueError, "`NONE` variable synchronization mode is not "
+ "supported with `Mirrored` distribution strategy. Please change "
+ "the `synchronization` for variable: v"):
+ variable_scope.variable(
+ 1.0,
+ name="v",
+ synchronization=variable_scope.VariableSynchronization.NONE)
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testInvalidSynchronizationWithVariable(self):
+ self._skip_eager_if_gpus_less_than(1)
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ with self.assertRaisesRegexp(
+ ValueError, "Invalid variable synchronization mode: Invalid for "
+ "variable: v"):
+ variable_scope.variable(1.0, name="v", synchronization="Invalid")
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testInvalidAggregationWithGetVariable(self):
+ self._skip_eager_if_gpus_less_than(1)
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ with self.assertRaisesRegexp(
+ ValueError, "Invalid variable aggregation mode: invalid for "
+ "variable: v"):
+ variable_scope.get_variable(
+ "v", [1],
+ synchronization=variable_scope.VariableSynchronization.ON_WRITE,
+ aggregation="invalid")
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testInvalidAggregationWithVariable(self):
+ self._skip_eager_if_gpus_less_than(1)
+ devices = ["/device:CPU:0", "/device:GPU:0"]
+ dist = mirrored_strategy.MirroredStrategy(devices)
+ with dist.scope():
+ with self.assertRaisesRegexp(
+ ValueError, "Invalid variable aggregation mode: invalid for "
+ "variable: v"):
+ variable_scope.variable(
+ 1.0,
+ name="v",
+ synchronization=variable_scope.VariableSynchronization.ON_WRITE,
+ aggregation="invalid")
@test_util.run_in_graph_and_eager_modes(config=config)
def testThreeDevices(self):
@@ -361,11 +491,14 @@ class MirroredStrategyVariableCreationTest(test.TestCase):
components_mean = {}
def model_fn(device_id):
- tower_context = distribute_lib.get_tower_context()
- with tower_context.tower_local_var_scope("sum"):
- v_sum = variable_scope.variable(1.0)
- with tower_context.tower_local_var_scope("mean"):
- v_mean = variable_scope.variable(4.0)
+ v_sum = variable_scope.variable(
+ 1.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
+ v_mean = variable_scope.variable(
+ 4.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.MEAN)
self.assertTrue(isinstance(v_sum, values.TowerLocalVariable))
self.assertTrue(isinstance(v_mean, values.TowerLocalVariable))
updates = [v_sum.assign_add(2.0 + device_id),
@@ -568,9 +701,10 @@ class MirroredStrategyVariableCreationTest(test.TestCase):
with context.graph_mode():
def model_fn():
- tower_context = distribute_lib.get_tower_context()
- with tower_context.tower_local_var_scope("sum"):
- v_sum = variable_scope.variable(1.0)
+ v_sum = variable_scope.variable(
+ 1.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
self.assertTrue(isinstance(v_sum, values.TowerLocalVariable))
return v_sum
@@ -642,7 +776,8 @@ class MirroredVariableUpdateTest(test.TestCase):
# aggregation type.
self._skip_eager_if_gpus_less_than(1)
def var_fn():
- v = variable_scope.variable(1.0, name="foo")
+ v = variable_scope.variable(
+ 1.0, name="foo", aggregation=variable_scope.VariableAggregation.SUM)
return v
dist = mirrored_strategy.MirroredStrategy(
@@ -650,9 +785,6 @@ class MirroredVariableUpdateTest(test.TestCase):
with dist.scope():
mirrored_var = dist.call_for_each_tower(var_fn, run_concurrently=False)
- # TODO(anjalisridhar): Use API introduced in cr/201463945 to set the
- # aggregation method.
- mirrored_var._aggregation_method = "sum"
self.assertIsInstance(mirrored_var, values.MirroredVariable)
self.evaluate(variables.global_variables_initializer())
@@ -661,7 +793,7 @@ class MirroredVariableUpdateTest(test.TestCase):
with self.assertRaisesRegexp(
ValueError, "A non PerDevice value cannot be reduced with the given "
- "method_string."):
+ "aggregation."):
self.evaluate(dist.unwrap(dist.call_for_each_tower(model_fn)))
@test_util.run_in_graph_and_eager_modes(config=config)
@@ -685,16 +817,14 @@ class MirroredVariableUpdateTest(test.TestCase):
def testAssignMirroredVarTowerContext(self):
self._skip_eager_if_gpus_less_than(1)
def var_fn():
- return variable_scope.variable(1.0, name="foo")
+ return variable_scope.variable(
+ 1.0, name="foo", aggregation=variable_scope.VariableAggregation.MEAN)
dist = mirrored_strategy.MirroredStrategy(
["/device:GPU:0", "/device:CPU:0"])
with dist.scope():
mirrored_var = dist.call_for_each_tower(var_fn, run_concurrently=False)
- # TODO(anjalisridhar): Use API introduced in cr/201463945 to set the
- # aggregation method.
- mirrored_var._aggregation_method = "mean"
self.assertIsInstance(mirrored_var, values.MirroredVariable)
self.evaluate(variables.global_variables_initializer())
self.assertEquals(1.0, self.evaluate(mirrored_var))
@@ -729,16 +859,14 @@ class MirroredVariableUpdateTest(test.TestCase):
def testAssignAddMirroredVarTowerContext(self):
self._skip_eager_if_gpus_less_than(1)
def var_fn():
- return variable_scope.variable(1.0, name="foo")
+ return variable_scope.variable(
+ 1.0, name="foo", aggregation=variable_scope.VariableAggregation.MEAN)
dist = mirrored_strategy.MirroredStrategy(
["/device:GPU:0", "/device:CPU:0"])
with dist.scope():
mirrored_var = dist.call_for_each_tower(var_fn, run_concurrently=False)
- # TODO(anjalisridhar): Use API introduced in cr/201463945 to set the
- # aggregation method.
- mirrored_var._aggregation_method = "mean"
self.assertIsInstance(mirrored_var, values.MirroredVariable)
self.evaluate(variables.global_variables_initializer())
self.assertEquals(1.0, self.evaluate(mirrored_var))
@@ -773,16 +901,14 @@ class MirroredVariableUpdateTest(test.TestCase):
def testAssignSubMirroredVarTowerContext(self):
self._skip_eager_if_gpus_less_than(1)
def var_fn():
- return variable_scope.variable(5.0, name="foo")
+ return variable_scope.variable(
+ 5.0, name="foo", aggregation=variable_scope.VariableAggregation.MEAN)
dist = mirrored_strategy.MirroredStrategy(
["/device:GPU:0", "/device:CPU:0"])
with dist.scope():
mirrored_var = dist.call_for_each_tower(var_fn, run_concurrently=False)
- # TODO(anjalisridhar): Use API introduced in cr/201463945 to set the
- # aggregation method.
- mirrored_var._aggregation_method = "mean"
self.assertIsInstance(mirrored_var, values.MirroredVariable)
self.evaluate(variables.global_variables_initializer())
self.assertEquals(5.0, self.evaluate(mirrored_var))
@@ -797,5 +923,118 @@ class MirroredVariableUpdateTest(test.TestCase):
self.assertEquals(4.5, self.evaluate(mirrored_var))
+class MirroredAndTowerLocalVariableInitializerTest(test.TestCase):
+ config = config_pb2.ConfigProto()
+ config.allow_soft_placement = True
+
+ def testAssignMirroredVarInitializer(self):
+ # This test is not eager compatible since in eager variables are initialized
+ # upon construction instead of once the initialization op is run.
+ with context.graph_mode():
+ def var_fn():
+ v = variable_scope.variable(1.0, name="foo")
+ return v
+
+ dist = mirrored_strategy.MirroredStrategy(
+ ["/device:GPU:0", "/device:CPU:0"])
+
+ with dist.scope():
+ mirrored_var = dist.call_for_each_tower(var_fn)
+ self.assertIsInstance(mirrored_var, values.MirroredVariable)
+ self.assertFalse(self.evaluate(mirrored_var.is_initialized()))
+ self.evaluate(mirrored_var.initializer)
+ self.assertTrue(self.evaluate(mirrored_var.is_initialized()))
+
+ def testAssignTowerLocalVarInitializer(self):
+ # This test is not eager compatible since in eager variables are initialized
+ # upon construction instead of once the initialization op is run.
+ with context.graph_mode():
+ def model_fn():
+ v_sum = variable_scope.variable(
+ 1.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
+ self.assertTrue(isinstance(v_sum, values.TowerLocalVariable))
+ return v_sum
+
+ dist = mirrored_strategy.MirroredStrategy(
+ ["/device:GPU:0", "/device:CPU:0"])
+
+ with dist.scope():
+ tower_local_var = dist.call_for_each_tower(model_fn)
+ self.assertTrue(isinstance(tower_local_var, values.TowerLocalVariable))
+ self.assertFalse(self.evaluate(tower_local_var.is_initialized()))
+ self.evaluate(tower_local_var.initializer)
+ self.assertTrue(self.evaluate(tower_local_var.is_initialized()))
+
+
+class TowerLocalVariableAssignTest(test.TestCase):
+ config = config_pb2.ConfigProto()
+ config.allow_soft_placement = True
+
+ def _skip_eager_if_gpus_less_than(self, num_gpus):
+ if context.num_gpus() < num_gpus and context.executing_eagerly():
+ self.skipTest("Enough GPUs not available for this test in eager mode.")
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testAssignTowerLocalVarSumAggregation(self):
+ self._skip_eager_if_gpus_less_than(1)
+ def model_fn():
+ v_sum = variable_scope.variable(
+ 1.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.SUM)
+ return v_sum
+
+ dist = mirrored_strategy.MirroredStrategy(
+ ["/device:GPU:0", "/device:CPU:0"])
+
+ with dist.scope():
+ tower_local_var = dist.call_for_each_tower(model_fn,
+ run_concurrently=False)
+ self.assertTrue(isinstance(tower_local_var, values.TowerLocalVariable))
+ self.evaluate(variables.global_variables_initializer())
+ # Each tower has a value of 1.0 assigned to it in tower context.
+ # When we read the value using `read_var` we should see the SUM of each of
+ # values on each of the towers.
+ self.assertEqual(2.0, self.evaluate(dist.read_var(tower_local_var)))
+ # Assigning 6.0 in cross tower context will assign a value of
+ # 6.0/num_towers to each tower.
+ tlv_ops = tower_local_var.assign(6.0)
+ self.evaluate(tlv_ops)
+ # On reading the tower local var we should get the assigned value back.
+ # The value on all the towers are added before being returned by
+ # `read_var`.
+ self.assertEqual(6.0, self.evaluate(dist.read_var(tower_local_var)))
+
+ @test_util.run_in_graph_and_eager_modes(config=config)
+ def testAssignTowerLocalVarMeanAggregation(self):
+ self._skip_eager_if_gpus_less_than(1)
+ def model_fn():
+ v_sum = variable_scope.variable(
+ 1.0,
+ synchronization=variable_scope.VariableSynchronization.ON_READ,
+ aggregation=variable_scope.VariableAggregation.MEAN)
+ return v_sum
+
+ dist = mirrored_strategy.MirroredStrategy(
+ ["/device:GPU:0", "/device:CPU:0"])
+
+ with dist.scope():
+ tower_local_var = dist.call_for_each_tower(model_fn,
+ run_concurrently=False)
+ self.assertTrue(isinstance(tower_local_var, values.TowerLocalVariable))
+ self.evaluate(variables.global_variables_initializer())
+ # Each tower has a value of 1.0 assigned to it in tower context.
+ # When we read the value using `read_var` we should see the MEAN of values
+ # on all towers which is the value assigned in tower context.
+ self.assertEqual(1.0, self.evaluate(dist.read_var(tower_local_var)))
+ tlv_ops = tower_local_var.assign(6.0)
+ self.evaluate(tlv_ops)
+ # On reading the tower local var we should get the MEAN of all values
+ # which is equal to the value assigned.
+ self.assertEqual(6.0, self.evaluate(dist.read_var(tower_local_var)))
+
+
if __name__ == "__main__":
test.main()