aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/distribute/python/keras_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/distribute/python/keras_test.py')
-rw-r--r--tensorflow/contrib/distribute/python/keras_test.py905
1 files changed, 905 insertions, 0 deletions
diff --git a/tensorflow/contrib/distribute/python/keras_test.py b/tensorflow/contrib/distribute/python/keras_test.py
new file mode 100644
index 0000000000..3511b7761f
--- /dev/null
+++ b/tensorflow/contrib/distribute/python/keras_test.py
@@ -0,0 +1,905 @@
+# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+"""Tests for tf.keras models using DistributionStrategy."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import os
+from absl.testing import parameterized
+import numpy as np
+
+from tensorflow.contrib.distribute.python import combinations
+from tensorflow.contrib.distribute.python import mirrored_strategy
+from tensorflow.contrib.distribute.python import tpu_strategy
+from tensorflow.contrib.distribute.python import values
+from tensorflow.python import keras
+from tensorflow.python.data.ops import dataset_ops
+from tensorflow.python.estimator import keras as keras_lib
+from tensorflow.python.estimator import run_config as run_config_lib
+from tensorflow.python.framework import constant_op
+from tensorflow.python.framework import dtypes
+from tensorflow.python.framework import test_util
+from tensorflow.python.keras import testing_utils
+from tensorflow.python.keras.engine import distributed_training_utils
+from tensorflow.python.ops.parsing_ops import gen_parsing_ops
+from tensorflow.python.platform import gfile
+from tensorflow.python.platform import test
+from tensorflow.python.summary.writer import writer_cache
+from tensorflow.python.training import gradient_descent
+from tensorflow.python.training import rmsprop
+
+
+_RANDOM_SEED = 1337
+_TRAIN_SIZE = 200
+_INPUT_SIZE = (10,)
+_NUM_CLASS = 2
+
+
+# TODO(anjalisridhar): Add a decorator that will allow us to run these tests as
+# part of the tf.keras unit tests suite.
+def simple_sequential_model():
+ model = keras.models.Sequential()
+ model.add(keras.layers.Dense(16, activation='relu', input_shape=_INPUT_SIZE))
+ model.add(keras.layers.Dropout(0.1))
+ model.add(keras.layers.Dense(_NUM_CLASS, activation='softmax'))
+ return model
+
+
+def simple_functional_model():
+ a = keras.layers.Input(shape=_INPUT_SIZE)
+ b = keras.layers.Dense(16, activation='relu')(a)
+ b = keras.layers.Dropout(0.1)(b)
+ b = keras.layers.Dense(_NUM_CLASS, activation='softmax')(b)
+ model = keras.models.Model(inputs=[a], outputs=[b])
+ return model
+
+
+def multi_inputs_multi_outputs_model():
+ input_a = keras.layers.Input(shape=(16,), name='input_a')
+ input_b = keras.layers.Input(shape=(16,), name='input_b')
+ input_m = keras.layers.Input(shape=(8,), dtype='string', name='input_m')
+ dense = keras.layers.Dense(8, name='dense_1')
+
+ interm_a = dense(input_a)
+ # Read m
+ interm_m = keras.layers.Lambda(gen_parsing_ops.string_to_number)(input_m)
+ interm_s = keras.layers.Lambda(lambda k: k[0] * k[1])([interm_m, interm_a])
+ interm_b = dense(input_b)
+ merged = keras.layers.concatenate([interm_s, interm_b], name='merge')
+ output_c = keras.layers.Dense(3, activation='softmax', name='dense_2')(merged)
+ output_d = keras.layers.Dense(2, activation='softmax', name='dense_3')(merged)
+ model = keras.models.Model(
+ inputs=[input_a, input_b, input_m], outputs=[output_c, output_d])
+ model.compile(
+ loss='categorical_crossentropy',
+ optimizer=gradient_descent.GradientDescentOptimizer(0.001),
+ metrics={
+ 'dense_2': 'categorical_accuracy',
+ 'dense_3': 'categorical_accuracy'
+ })
+ return model
+
+
+def get_ds_train_input_fn():
+ np.random.seed(_RANDOM_SEED)
+ (x_train, y_train), _ = testing_utils.get_test_data(
+ train_samples=_TRAIN_SIZE,
+ test_samples=50,
+ input_shape=_INPUT_SIZE,
+ num_classes=_NUM_CLASS)
+ y_train = keras.utils.to_categorical(y_train)
+
+ dataset = dataset_ops.Dataset.from_tensor_slices((x_train, y_train))
+ dataset = dataset.batch(32)
+ return dataset
+
+
+def get_ds_test_input_fn():
+ np.random.seed(_RANDOM_SEED)
+ _, (x_test, y_test) = testing_utils.get_test_data(
+ train_samples=_TRAIN_SIZE,
+ test_samples=50,
+ input_shape=_INPUT_SIZE,
+ num_classes=_NUM_CLASS)
+ y_test = keras.utils.to_categorical(y_test)
+
+ dataset = dataset_ops.Dataset.from_tensor_slices((x_test, y_test))
+ dataset = dataset.batch(32)
+ return dataset
+
+
+def get_multi_inputs_multi_outputs_data():
+ (a_train, c_train), (a_test, c_test) = testing_utils.get_test_data(
+ train_samples=_TRAIN_SIZE,
+ test_samples=50,
+ input_shape=(16,),
+ num_classes=3,
+ random_seed=_RANDOM_SEED)
+ (b_train, d_train), (b_test, d_test) = testing_utils.get_test_data(
+ train_samples=_TRAIN_SIZE,
+ test_samples=50,
+ input_shape=(16,),
+ num_classes=2,
+ random_seed=_RANDOM_SEED)
+ (m_train, _), (m_test, _) = testing_utils.get_test_data(
+ train_samples=_TRAIN_SIZE,
+ test_samples=50,
+ input_shape=(8,),
+ num_classes=2,
+ random_seed=_RANDOM_SEED)
+
+ c_train = keras.utils.to_categorical(c_train)
+ c_test = keras.utils.to_categorical(c_test)
+ d_train = keras.utils.to_categorical(d_train)
+ d_test = keras.utils.to_categorical(d_test)
+
+ train_data = {
+ 'input_a': a_train,
+ 'input_b': b_train,
+ 'input_m': m_train,
+ 'output_c': c_train,
+ 'output_d': d_train
+ }
+ test_data = {
+ 'input_a': a_test,
+ 'input_b': b_test,
+ 'input_m': m_test,
+ 'output_c': c_test,
+ 'output_d': d_test
+ }
+
+ return (train_data, test_data)
+
+
+def batch_wrapper(dataset, batch_size, distribution):
+ # TPUs currently require fully defined input shapes, drop_remainder ensures
+ # the input will have fully defined shapes.
+ if isinstance(distribution, tpu_strategy.TPUStrategy):
+ return dataset.batch(batch_size, drop_remainder=True)
+ else:
+ return dataset.batch(batch_size)
+
+
+def get_model():
+ x = keras.layers.Input(shape=(3,), name='input')
+ y = keras.layers.Dense(4, name='dense')(x)
+ model = keras.Model(x, y)
+ return model
+
+
+def get_dataset(distribution):
+ inputs = np.zeros((10, 3), dtype=np.float32)
+ targets = np.zeros((10, 4), dtype=np.float32)
+ dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
+ dataset = dataset.repeat(100)
+ dataset = batch_wrapper(dataset, 10, distribution)
+ return dataset
+
+
+def get_predict_dataset(distribution):
+ inputs = np.zeros((10, 3), dtype=np.float32)
+ dataset = dataset_ops.Dataset.from_tensor_slices(inputs)
+ dataset = dataset.repeat(100)
+ dataset = batch_wrapper(dataset, 10, distribution)
+ return dataset
+
+
+strategies = [combinations.default_strategy,
+ combinations.one_device_strategy,
+ combinations.mirrored_strategy_with_gpu_and_cpu,
+ combinations.mirrored_strategy_with_two_gpus,
+ combinations.tpu_strategy_one_step]
+
+
+def strategy_combinations():
+ return combinations.combine(
+ distribution=strategies,
+ mode=['graph'])
+
+
+def strategy_and_optimizer_combinations():
+ return combinations.combine(
+ distribution=strategies,
+ optimizer=[combinations.adagrad_optimizer_v1_fn,
+ combinations.adam_optimizer_v1_fn,
+ combinations.gradient_descent_optimizer_v1_fn,
+ combinations.rmsprop_optimizer_v1_fn],
+ mode=['graph'])
+
+
+class TestEstimatorDistributionStrategy(test_util.TensorFlowTestCase):
+
+ def setUp(self):
+ self._base_dir = os.path.join(self.get_temp_dir(),
+ 'keras_mirrored_strategy_test')
+ gfile.MakeDirs(self._base_dir)
+ self._config = run_config_lib.RunConfig(
+ tf_random_seed=_RANDOM_SEED, model_dir=self._base_dir)
+ self._dist = mirrored_strategy.MirroredStrategy(
+ devices=['/device:GPU:0', '/device:GPU:1'])
+
+ def tearDown(self):
+ writer_cache.FileWriterCache.clear()
+ if os.path.isdir(self._base_dir):
+ gfile.DeleteRecursively(self._base_dir)
+
+ def test_train_functional_with_distribution_strategy(self):
+ dist = mirrored_strategy.MirroredStrategy(
+ devices=['/device:GPU:0', '/device:GPU:1'])
+ keras_model = simple_functional_model()
+ keras_model.compile(
+ loss='categorical_crossentropy',
+ metrics=[keras.metrics.CategoricalAccuracy()],
+ optimizer=rmsprop.RMSPropOptimizer(learning_rate=0.01))
+ config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
+ model_dir=self._base_dir,
+ train_distribute=dist,
+ eval_distribute=dist)
+ with self.cached_session():
+ est_keras = keras_lib.model_to_estimator(
+ keras_model=keras_model, config=config)
+ before_eval_results = est_keras.evaluate(
+ input_fn=get_ds_test_input_fn, steps=1)
+ est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
+ after_eval_results = est_keras.evaluate(input_fn=get_ds_test_input_fn,
+ steps=1)
+ self.assertLess(after_eval_results['loss'], before_eval_results['loss'])
+
+ writer_cache.FileWriterCache.clear()
+ gfile.DeleteRecursively(self._config.model_dir)
+
+ def test_train_sequential_with_distribution_strategy(self):
+ dist = mirrored_strategy.MirroredStrategy(
+ devices=['/device:GPU:0', '/device:GPU:1'])
+ keras_model = simple_sequential_model()
+ keras_model.compile(
+ loss='categorical_crossentropy',
+ metrics=[keras.metrics.CategoricalAccuracy()],
+ optimizer=rmsprop.RMSPropOptimizer(learning_rate=0.01))
+ config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
+ model_dir=self._base_dir,
+ train_distribute=dist)
+ with self.cached_session():
+ est_keras = keras_lib.model_to_estimator(
+ keras_model=keras_model, config=config)
+ before_eval_results = est_keras.evaluate(
+ input_fn=get_ds_test_input_fn, steps=1)
+ est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
+ after_eval_results = est_keras.evaluate(input_fn=get_ds_test_input_fn,
+ steps=1)
+ self.assertLess(after_eval_results['loss'], before_eval_results['loss'])
+
+ writer_cache.FileWriterCache.clear()
+ gfile.DeleteRecursively(self._config.model_dir)
+
+ def test_multi_inputs_multi_outputs_with_input_fn_as_dict(self):
+ train_data, test_data = get_multi_inputs_multi_outputs_data()
+
+ def train_input_fn():
+ input_dict = {
+ 'input_a': train_data['input_a'],
+ 'input_b': train_data['input_b'],
+ 'input_m': train_data['input_m'].astype(np.str)
+ }
+ output_dict = {
+ 'dense_2': train_data['output_c'],
+ 'dense_3': train_data['output_d']
+ }
+ return dataset_ops.Dataset.from_tensor_slices((input_dict,
+ output_dict)).batch(16)
+
+ def eval_input_fn():
+ input_dict = {
+ 'input_a': test_data['input_a'],
+ 'input_b': test_data['input_b'],
+ 'input_m': test_data['input_m'].astype(np.str)
+ }
+ output_dict = {
+ 'dense_2': test_data['output_c'],
+ 'dense_3': test_data['output_d']
+ }
+ return dataset_ops.Dataset.from_tensor_slices((input_dict,
+ output_dict)).batch(16)
+
+ self.do_test_multi_inputs_multi_outputs_with_input_fn(
+ train_input_fn, eval_input_fn)
+
+ def do_test_multi_inputs_multi_outputs_with_input_fn(self, train_input_fn,
+ eval_input_fn):
+ config = run_config_lib.RunConfig(
+ tf_random_seed=_RANDOM_SEED,
+ model_dir=self._base_dir,
+ train_distribute=self._dist)
+ with self.cached_session():
+ model = multi_inputs_multi_outputs_model()
+ est_keras = keras_lib.model_to_estimator(keras_model=model, config=config)
+ baseline_eval_results = est_keras.evaluate(
+ input_fn=eval_input_fn, steps=1)
+ est_keras.train(input_fn=train_input_fn, steps=_TRAIN_SIZE / 16)
+ eval_results = est_keras.evaluate(input_fn=eval_input_fn, steps=1)
+ self.assertLess(eval_results['loss'], baseline_eval_results['loss'])
+
+ def test_keras_optimizer_with_distribution_strategy(self):
+ dist = mirrored_strategy.MirroredStrategy(
+ devices=['/device:GPU:0', '/device:GPU:1'])
+ keras_model = simple_sequential_model()
+ keras_model.compile(
+ loss='categorical_crossentropy',
+ optimizer=keras.optimizers.rmsprop(lr=0.01))
+
+ config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
+ model_dir=self._base_dir,
+ train_distribute=dist)
+ with self.cached_session():
+ est_keras = keras_lib.model_to_estimator(keras_model=keras_model,
+ config=config)
+ with self.assertRaisesRegexp(ValueError,
+ 'Only TensorFlow native optimizers are '
+ 'supported with DistributionStrategy.'):
+ est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
+
+ writer_cache.FileWriterCache.clear()
+ gfile.DeleteRecursively(self._config.model_dir)
+
+
+class TestDistributionStrategyWithNumpyArrays(test.TestCase,
+ parameterized.TestCase):
+
+ @combinations.generate(strategy_combinations())
+ def test_creating_var_with_numpy_arrays(self, distribution):
+ with self.cached_session():
+ x = np.asarray(np.random.random((64, 3)), dtype=np.float32)
+ var_x = distributed_training_utils.get_var_for_numpy(distribution, x)
+ val = self.evaluate(var_x.value())
+ # Verify that the numpy value is copied to the variable.
+ self.assertAllEqual(x, val)
+
+ @combinations.generate(strategy_combinations())
+ def test_calling_model_with_numpy_arrays(self, distribution):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.001)
+ loss = 'mse'
+ metrics = ['mae']
+ model.compile(optimizer, loss, metrics=metrics, distribute=distribution)
+
+ inputs = np.zeros((64, 3), dtype=np.float32)
+ targets = np.zeros((64, 4), dtype=np.float32)
+
+ # Call fit with validation data
+ model.fit(inputs, targets, epochs=1, batch_size=2, verbose=0,
+ validation_data=(inputs, targets))
+
+ # TODO(anjalisridhar): We need tests for when the batch size and steps are
+ # smaller and results in a 0 batch_size and steps value.
+ model.evaluate(inputs, targets)
+ # with steps
+ model.evaluate(inputs, targets, steps=2)
+ # with batch_size
+ model.evaluate(inputs, targets, batch_size=8)
+
+ model.predict(inputs)
+ # with steps
+ model.predict(inputs, steps=2)
+ # with batch_size
+ model.predict(inputs, batch_size=8)
+
+ @combinations.generate(strategy_combinations())
+ def test_calling_model_with_nested_numpy_arrays(self, distribution):
+ with self.cached_session():
+ a = keras.layers.Input(shape=(3,), name='input_a')
+ b = keras.layers.Input(shape=(3,), name='input_b')
+
+ dense = keras.layers.Dense(4, name='dense')
+ c = dense(a)
+ d = dense(b)
+ e = keras.layers.Dropout(0.5, name='dropout')(c)
+
+ model = keras.models.Model([a, b], [d, e])
+
+ optimizer = gradient_descent.GradientDescentOptimizer(learning_rate=0.001)
+ loss = 'mse'
+ model.compile(optimizer, loss, distribute=distribution)
+
+ input_a_np = np.asarray(np.random.random((64, 3)), dtype=np.float32)
+ input_b_np = np.asarray(np.random.random((64, 3)), dtype=np.float32)
+ inputs = [input_a_np, input_b_np]
+
+ output_d_np = np.asarray(np.random.random((64, 4)), dtype=np.float32)
+ output_e_np = np.asarray(np.random.random((64, 4)), dtype=np.float32)
+ targets = [output_d_np, output_e_np]
+
+ # Call fit with validation data
+ model.fit(inputs, targets, epochs=1, batch_size=8, verbose=0)
+
+ # TODO(anjalisridhar): We need tests for when the batch size and steps are
+ # smaller and results in a 0 batch_size and steps value.
+ model.evaluate(inputs, targets)
+ # with steps
+ model.evaluate(inputs, targets, steps=2)
+ # with batch_size
+ model.evaluate(inputs, targets, batch_size=8)
+
+ model.predict(inputs)
+ # with steps
+ model.predict(inputs, steps=2)
+ # with batch_size
+ model.predict(inputs, batch_size=8)
+
+
+class TestDistributionStrategyWithDatasets(test.TestCase,
+ parameterized.TestCase):
+
+ @combinations.generate(strategy_combinations())
+ def test_calling_model_on_same_dataset(self, distribution):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.001)
+ loss = 'mse'
+ metrics = ['mae', keras.metrics.CategoricalAccuracy()]
+ model.compile(optimizer, loss, metrics=metrics, distribute=distribution)
+
+ dataset = get_dataset(distribution)
+
+ # Call fit with validation data
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0,
+ validation_data=dataset, validation_steps=2)
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0,
+ validation_data=dataset, validation_steps=2)
+ model.predict(get_predict_dataset(distribution), steps=2)
+
+ # TODO(priyag): Enable this test for TPU. Currently tuples/dict don't work
+ # as clone_model's input_tensors argument only seems to accept list and not
+ # tuples or dict.
+ def test_fit_with_tuple_and_dict_dataset_inputs(self):
+ with self.cached_session():
+ a = keras.layers.Input(shape=(3,), name='input_a')
+ b = keras.layers.Input(shape=(3,), name='input_b')
+
+ dense = keras.layers.Dense(4, name='dense')
+ c = dense(a)
+ d = dense(b)
+ e = keras.layers.Dropout(0.5, name='dropout')(c)
+
+ model = keras.models.Model([a, b], [d, e])
+
+ optimizer = gradient_descent.GradientDescentOptimizer(learning_rate=0.001)
+ loss = 'mse'
+ metrics = ['mae', keras.metrics.CategoricalAccuracy()]
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:0',
+ '/device:CPU:0'])
+ model.compile(optimizer, loss, metrics=metrics, distribute=strategy)
+
+ input_a_np = np.random.random((10, 3))
+ input_b_np = np.random.random((10, 3))
+ output_d_np = np.random.random((10, 4))
+ output_e_np = np.random.random((10, 4))
+
+ # Test with tuples
+ dataset_tuple = dataset_ops.Dataset.from_tensor_slices((
+ (input_a_np, input_b_np), (output_d_np, output_e_np)))
+ dataset_tuple = dataset_tuple.repeat(100)
+ dataset_tuple = dataset_tuple.batch(10)
+
+ model.fit(dataset_tuple, epochs=1, steps_per_epoch=2, verbose=1)
+
+ # Test with dict
+ dataset_dict = dataset_ops.Dataset.from_tensor_slices((
+ {'input_a': input_a_np, 'input_b': input_b_np},
+ (output_d_np, output_e_np)))
+ dataset_dict = dataset_dict.repeat(100)
+ dataset_dict = dataset_dict.batch(10)
+
+ model.fit(dataset_dict, epochs=1, steps_per_epoch=2, verbose=1)
+
+ @combinations.generate(strategy_combinations())
+ def test_fit_eval_and_predict_methods_on_dataset(self, distribution):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.001)
+ loss = 'mse'
+ metrics = ['mae', keras.metrics.CategoricalAccuracy()]
+ model.compile(optimizer, loss, metrics=metrics, distribute=distribution)
+
+ dataset = get_dataset(distribution)
+
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=1)
+ model.evaluate(dataset, steps=2, verbose=1)
+ model.predict(get_predict_dataset(distribution), steps=2)
+
+ @combinations.generate(strategy_and_optimizer_combinations())
+ def test_fit_eval_and_predict_with_optimizer(self, distribution, optimizer):
+ with self.cached_session():
+ model = get_model()
+
+ loss = 'mse'
+ model.compile(optimizer(), loss, distribute=distribution)
+
+ dataset = get_dataset(distribution)
+
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=1)
+ model.evaluate(dataset, steps=2, verbose=1)
+ model.predict(get_predict_dataset(distribution), steps=2)
+
+ def test_dataset_input_shape_validation(self):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = rmsprop.RMSPropOptimizer(learning_rate=0.001)
+ loss = 'mse'
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:1',
+ '/device:GPU:0'])
+
+ model.compile(optimizer, loss, distribute=strategy)
+
+ # User forgets to batch the dataset
+ inputs = np.zeros((10, 3), dtype=np.float32)
+ targets = np.zeros((10, 4), dtype=np.float32)
+ dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
+ dataset = dataset.repeat(100)
+
+ with self.assertRaisesRegexp(ValueError, 'expected input to have shape'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0)
+
+ # Wrong input shape
+ inputs = np.zeros((10, 5), dtype=np.float32)
+ targets = np.zeros((10, 4), dtype=np.float32)
+ dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
+ dataset = dataset.repeat(100)
+ dataset = dataset.batch(10)
+
+ with self.assertRaisesRegexp(ValueError,
+ 'expected input to have shape'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0)
+
+ @combinations.generate(combinations.combine(
+ distribution=[combinations.tpu_strategy_one_step],
+ mode=['graph']))
+ def test_dataset_input_shape_fully_defined(self, distribution):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = rmsprop.RMSPropOptimizer(learning_rate=0.001)
+ loss = 'mse'
+ model.compile(optimizer, loss, distribute=distribution)
+
+ dataset = get_dataset(distribution)
+ # Input shapes are not fully known. Batch dimension is unknown as we are
+ # not using the drop_remainder argument.
+ dataset = dataset.repeat(100).batch(10)
+
+ with self.assertRaisesRegexp(ValueError, 'requires fully defined shapes'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0)
+
+ def test_learning_phase_value(self):
+ # TODO(anjalisridhar): Modify this test to use Lambdas since we can compare
+ # meaningful values. Currently we don't pass the learning phase if the
+ # Lambda layer uses the learning phase.
+ with self.cached_session():
+ x = keras.layers.Input(shape=(16,), name='input')
+ y = keras.layers.Dense(16)(x)
+ z = keras.layers.Dropout(0.9999)(y)
+ model = keras.Model(x, z)
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.005)
+ loss = 'mse'
+ metrics = ['acc']
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:0',
+ '/device:CPU:0'])
+
+ model.compile(optimizer, loss, metrics=metrics, distribute=strategy)
+
+ inputs = np.random.rand(10, 16)
+ targets = np.ones((10, 16), dtype=np.float32)
+ dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
+ dataset = dataset.repeat(100)
+ dataset = dataset.batch(8)
+
+ hist = model.fit(dataset, epochs=5, steps_per_epoch=20, verbose=1)
+ self.assertEqual(hist.history['acc'][0], 1)
+
+ evaluate_output = model.evaluate(dataset, steps=20)
+ self.assertEqual(evaluate_output[1], 0)
+
+ predict_output = model.predict(dataset, steps=1)
+ self.assertNotEqual(np.mean(predict_output), 0)
+
+
+class TestDistributionStrategyErrorCases(test.TestCase, parameterized.TestCase):
+
+ def test_validating_dataset_input_tensors_with_shape_mismatch(self):
+ with self.cached_session():
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:0',
+ '/device:CPU:0'])
+ a = constant_op.constant([1, 2], shape=(1, 2))
+ b = constant_op.constant([[1, 2], [1, 2]], shape=(2, 2))
+ x = values.DistributedValues({'/device:CPU:0': a, '/device:GPU:0': b})
+ y = values.DistributedValues({'/device:CPU:0': a, '/device:GPU:0': a})
+ with strategy.scope():
+ # Removed device and input tensor shape details from the error message
+ # since the order of the device and the corresponding input tensor shape
+ # is not deterministic over different runs.
+ with self.assertRaisesRegexp(ValueError,
+ 'Input tensor shapes do not match for '
+ 'distributed tensor inputs '
+ 'DistributedValues:.+'):
+ distributed_training_utils.validate_distributed_dataset_inputs(
+ strategy, x, y)
+
+ def test_validating_dataset_input_tensors_with_dtype_mismatch(self):
+ with self.cached_session():
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:0',
+ '/device:CPU:0'])
+ a = constant_op.constant([1, 2], shape=(1, 2), dtype=dtypes.int32)
+ b = constant_op.constant([1, 2], shape=(1, 2), dtype=dtypes.float64)
+ x = values.DistributedValues({'/device:CPU:0': a, '/device:GPU:0': b})
+ y = values.DistributedValues({'/device:CPU:0': a, '/device:GPU:0': a})
+ with strategy.scope():
+ # Removed device and input tensor dtype details from the error message
+ # since the order of the device and the corresponding input tensor dtype
+ # is not deterministic over different runs.
+ with self.assertRaisesRegexp(ValueError,
+ 'Input tensor dtypes do not match for '
+ 'distributed tensor inputs '
+ 'DistributedValues:.+'):
+ distributed_training_utils.validate_distributed_dataset_inputs(
+ strategy, x, y)
+
+ def test_unsupported_features(self):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.001)
+ loss = 'mse'
+ metrics = ['mae']
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:1',
+ '/device:GPU:0'])
+
+ model.compile(optimizer, loss, metrics=metrics, distribute=strategy)
+
+ dataset = get_dataset(strategy)
+
+ # Test with validation split
+ with self.assertRaisesRegexp(
+ ValueError, '`validation_split` argument is not '
+ 'supported when input `x` is a dataset or a '
+ 'dataset iterator.+'):
+ model.fit(dataset,
+ epochs=1, steps_per_epoch=2, verbose=0,
+ validation_split=0.5, validation_steps=2)
+
+ # Test with sample weight.
+ sample_weight = np.random.random((10,))
+ with self.assertRaisesRegexp(
+ NotImplementedError, '`sample_weight` is currently not supported '
+ 'when using DistributionStrategy.'):
+ model.fit(
+ dataset,
+ epochs=1,
+ steps_per_epoch=2,
+ verbose=0,
+ sample_weight=sample_weight)
+
+ # Test with not specifying the `steps` argument.
+ with self.assertRaisesRegexp(
+ ValueError, 'you should specify the `steps_per_epoch` argument'):
+ model.fit(dataset, epochs=1, verbose=0)
+ with self.assertRaisesRegexp(ValueError,
+ 'you should specify the `steps` argument'):
+ model.evaluate(dataset, verbose=0)
+
+ with self.assertRaisesRegexp(ValueError,
+ 'you should specify the `steps` argument'):
+ model.predict(dataset, verbose=0)
+
+ def test_calling_with_unsupported_predefined_callbacks(self):
+ with self.cached_session():
+ model = get_model()
+
+ optimizer = gradient_descent.GradientDescentOptimizer(0.001)
+ loss = 'mse'
+ metrics = ['mae']
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:1',
+ '/device:GPU:0'])
+ model.compile(optimizer, loss, metrics=metrics, distribute=strategy)
+
+ dataset = get_dataset(strategy)
+
+ def schedule(_):
+ return 0.001
+ with self.assertRaisesRegexp(ValueError,
+ 'LearningRateScheduler callback is not '
+ 'supported with DistributionStrategy.'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0,
+ callbacks=[keras.callbacks.LearningRateScheduler(schedule)])
+
+ with self.assertRaisesRegexp(ValueError,
+ 'ReduceLROnPlateau callback is not '
+ 'supported with DistributionStrategy.'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0,
+ callbacks=[keras.callbacks.ReduceLROnPlateau()])
+ with self.assertRaisesRegexp(ValueError,
+ 'histogram_freq in the TensorBoard callback '
+ 'is not supported when using '
+ 'DistributionStrategy.'):
+ model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0,
+ callbacks=[keras.callbacks.TensorBoard(histogram_freq=10)])
+
+
+class TestDistributionStrategyWithLossMasking(test.TestCase):
+
+ # TODO(priyag): Enable all strategies for this test. Currently it does not
+ # work for TPU due to some invalid datatype.
+ def test_masking(self):
+ with self.cached_session():
+ np.random.seed(1337)
+ x = np.array([[[1], [1]], [[0], [0]]])
+ model = keras.models.Sequential()
+ model.add(keras.layers.Masking(mask_value=0, input_shape=(2, 1)))
+ model.add(
+ keras.layers.TimeDistributed(
+ keras.layers.Dense(1, kernel_initializer='one')))
+ strategy = mirrored_strategy.MirroredStrategy(['/device:GPU:1',
+ '/device:GPU:0'])
+
+ model.compile(loss='mse',
+ optimizer=gradient_descent.GradientDescentOptimizer(0.01),
+ distribute=strategy)
+ y = np.array([[[1], [1]], [[1], [1]]])
+ dataset = dataset_ops.Dataset.from_tensor_slices((x, y))
+ dataset = dataset.repeat(100)
+ dataset = dataset.batch(10)
+ hist = model.fit(x=dataset, epochs=1, steps_per_epoch=2)
+ self.assertEqual(hist.history['loss'][0], 0)
+
+
+class TestDistributionStrategyWithNormalizationLayer(
+ test.TestCase, parameterized.TestCase):
+
+ @combinations.generate(strategy_combinations())
+ def test_batchnorm_correctness(self, distribution):
+ with self.cached_session():
+ model = keras.models.Sequential()
+ norm = keras.layers.BatchNormalization(input_shape=(10,), momentum=0.8)
+ model.add(norm)
+ model.compile(loss='mse',
+ optimizer=gradient_descent.GradientDescentOptimizer(0.01),
+ distribute=distribution)
+
+ # centered on 5.0, variance 10.0
+ x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10))
+ x = x.astype('float32')
+ dataset = dataset_ops.Dataset.from_tensor_slices((x, x))
+ dataset = dataset.repeat(100)
+ dataset = batch_wrapper(dataset, 32, distribution)
+
+ predict_dataset = dataset_ops.Dataset.from_tensor_slices(x)
+ predict_dataset = predict_dataset.repeat(100)
+ predict_dataset = batch_wrapper(predict_dataset, 32, distribution)
+
+ model.fit(dataset, epochs=4, verbose=0, steps_per_epoch=10)
+ out = model.predict(predict_dataset, steps=2)
+ out -= keras.backend.eval(norm.beta)
+ out /= keras.backend.eval(norm.gamma)
+ np.testing.assert_allclose(out.mean(), 0.0, atol=1e-1)
+ np.testing.assert_allclose(out.std(), 1.0, atol=1e-1)
+
+
+class TestDistributionStrategyCorrectness(test.TestCase,
+ parameterized.TestCase):
+
+ @combinations.generate(strategy_combinations())
+ def test_metric_correctness(self, distribution):
+ with self.cached_session():
+ keras.backend.set_image_data_format('channels_last')
+ num_samples = 10000
+
+ x_train = np.random.randint(0, 2, num_samples)
+ x_train = np.reshape(x_train, (num_samples, 1))
+ y_train = x_train
+ x_train = x_train.astype('float32')
+ y_train = y_train.astype('float32')
+
+ # Create identity model.
+ model = keras.Sequential()
+ model.add(
+ keras.layers.Dense(1, input_shape=(1,), kernel_initializer='ones'))
+ model.compile(
+ loss=keras.losses.mean_squared_error,
+ optimizer=gradient_descent.GradientDescentOptimizer(0.5),
+ metrics=[keras.metrics.BinaryAccuracy()],
+ distribute=distribution)
+
+ batch_size = 64
+ batch_size //= distribution.num_towers
+ train_dataset = dataset_ops.Dataset.from_tensor_slices((x_train, y_train))
+ train_dataset = batch_wrapper(train_dataset, batch_size, distribution)
+
+ history = model.fit(x=train_dataset, epochs=1, steps_per_epoch=10)
+ self.assertEqual(history.history['binary_accuracy'], [1.0])
+
+ @combinations.generate(strategy_combinations())
+ def test_correctness(self, distribution):
+ with self.cached_session():
+ keras.backend.set_image_data_format('channels_last')
+ num_samples = 10000
+
+ # Train and predict datasets are created with the same input numpy arrays.
+ x_train = np.random.rand(num_samples, 1)
+ y_train = 3 * x_train
+ x_train = x_train.astype('float32')
+ y_train = y_train.astype('float32')
+
+ # The model is built once and the initial weights are saved.
+ # This is used to initialize the model for both the distribution and
+ # non-distribution run.
+ model = keras.Sequential()
+ model.add(keras.layers.Dense(1, input_shape=(1,)))
+ initial_weights = model.get_weights()
+
+ def fit_and_predict(with_distribution=None):
+ model.set_weights(initial_weights)
+ model.compile(
+ loss=keras.losses.mean_squared_error,
+ optimizer=gradient_descent.GradientDescentOptimizer(0.5),
+ distribute=with_distribution)
+
+ batch_size = 64
+ if with_distribution:
+ batch_size //= with_distribution.num_towers
+ train_dataset = dataset_ops.Dataset.from_tensor_slices((x_train,
+ y_train))
+ train_dataset = batch_wrapper(train_dataset, batch_size, distribution)
+ # We have initialized the model to the same weight for the distribution
+ # and non-distribution run. If you want to initialize the model to
+ # random weights for each run, you need to run the model through the
+ # entire dataset at least once to ensure that the weights converge to
+ # the same value.
+ model.fit(x=train_dataset, epochs=1, steps_per_epoch=10)
+
+ weights = model.get_weights()
+ x_predict = [[1.], [2.], [3.], [4.]]
+ predict_batch_size = 4
+ if with_distribution:
+ predict_batch_size //= with_distribution.num_towers
+ predict_dataset = dataset_ops.Dataset.from_tensor_slices(x_predict)
+ predict_dataset = batch_wrapper(predict_dataset,
+ predict_batch_size, distribution)
+ predict_result = model.predict(predict_dataset, steps=1)
+ predict_result = np.reshape(predict_result, (4, 1))
+
+ return weights, predict_result
+
+ wts_with_ds, predict_with_ds = fit_and_predict(
+ with_distribution=distribution)
+ wts_without_ds, predict_without_ds = fit_and_predict(
+ with_distribution=None)
+
+ # Verify that the weights are the same within some limits of tolerance.
+ np.testing.assert_allclose(wts_with_ds[0], wts_without_ds[0], rtol=1e-3)
+ # Verify that the predicted outputs are the same within some limits of
+ # tolerance.
+ np.testing.assert_allclose(predict_with_ds, predict_without_ds, rtol=1e-3)
+
+
+# TODO(priyag): Add a test for TPUStrategy with steps_per_run > 1.
+
+
+if __name__ == '__main__':
+ test.main()