aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py')
-rw-r--r--tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py404
1 files changed, 0 insertions, 404 deletions
diff --git a/tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py b/tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py
deleted file mode 100644
index 76aa1c3cfd..0000000000
--- a/tensorflow/contrib/data/python/kernel_tests/optimize_dataset_op_test.py
+++ /dev/null
@@ -1,404 +0,0 @@
-# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""Tests for the experimental input pipeline ops."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-from absl.testing import parameterized
-
-from tensorflow.contrib.data.python.kernel_tests import stats_dataset_test_base
-from tensorflow.contrib.data.python.kernel_tests import test_utils
-from tensorflow.contrib.data.python.ops import optimization
-from tensorflow.contrib.data.python.ops import stats_ops
-from tensorflow.python.data.ops import dataset_ops
-from tensorflow.python.framework import constant_op
-from tensorflow.python.framework import dtypes
-from tensorflow.python.framework import errors
-from tensorflow.python.framework import ops
-from tensorflow.python.ops import array_ops
-from tensorflow.python.ops import check_ops
-from tensorflow.python.ops import math_ops
-from tensorflow.python.platform import test
-
-
-class OptimizeDatasetTest(test.TestCase, parameterized.TestCase):
-
- def testAssertSuffix(self):
- dataset = dataset_ops.Dataset.from_tensors(0).apply(
- optimization.assert_next(["Map"])).map(lambda x: x)
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- self.assertEqual(0, sess.run(get_next))
-
- def testAssertSuffixInvalid(self):
- dataset = dataset_ops.Dataset.from_tensors(0).apply(
- optimization.assert_next(["Whoops"])).map(lambda x: x)
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- with self.assertRaisesRegexp(
- errors.InvalidArgumentError,
- "Asserted Whoops transformation at offset 0 but encountered "
- "Map transformation instead."):
- sess.run(get_next)
-
- def testAssertSuffixShort(self):
- dataset = dataset_ops.Dataset.from_tensors(0).apply(
- optimization.assert_next(["Map", "Whoops"])).map(lambda x: x)
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- with self.assertRaisesRegexp(
- errors.InvalidArgumentError,
- "Asserted next 2 transformations but encountered only 1."):
- sess.run(get_next)
-
- def testDefaultOptimizations(self):
- dataset = dataset_ops.Dataset.range(10).apply(
- optimization.assert_next(
- ["Map", "Batch"])).map(lambda x: x * x).batch(10).apply(
- optimization.optimize())
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- self.assertAllEqual([x * x for x in range(10)], sess.run(get_next))
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
-
- def testEmptyOptimizations(self):
- dataset = dataset_ops.Dataset.range(10).apply(
- optimization.assert_next(
- ["Map", "Batch"])).map(lambda x: x * x).batch(10).apply(
- optimization.optimize([]))
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- self.assertAllEqual([x * x for x in range(10)], sess.run(get_next))
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
-
- def testOptimization(self):
- dataset = dataset_ops.Dataset.range(10).apply(
- optimization.assert_next(
- ["MapAndBatch"])).map(lambda x: x * x).batch(10).apply(
- optimization.optimize(["map_and_batch_fusion"]))
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- self.assertAllEqual([x * x for x in range(10)], sess.run(get_next))
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
-
- def testFunctionLibraryDefinitionModification(self):
- dataset = dataset_ops.Dataset.from_tensors(0).map(lambda x: x).apply(
- optimization.optimize(["_test_only_function_rename"]))
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
-
- with self.test_session() as sess:
- with self.assertRaisesRegexp(errors.NotFoundError,
- "Function .* is not defined."):
- sess.run(get_next)
-
- @staticmethod
- def map_functions():
- identity = lambda x: x
- increment = lambda x: x + 1
-
- def increment_and_square(x):
- y = x + 1
- return y * y
-
- functions = [identity, increment, increment_and_square]
- tests = []
- for i, fun1 in enumerate(functions):
- for j, fun2 in enumerate(functions):
- tests.append((
- "test_{}_{}".format(i, j),
- [fun1, fun2],
- ))
- for k, fun3 in enumerate(functions):
- tests.append((
- "test_{}_{}_{}".format(i, j, k),
- [fun1, fun2, fun3],
- ))
-
- swap = lambda x, n: (n, x)
- tests.append((
- "swap1",
- [lambda x: (x, 42), swap],
- ))
- tests.append((
- "swap2",
- [lambda x: (x, 42), swap, swap],
- ))
- return tuple(tests)
-
- @parameterized.named_parameters(*map_functions.__func__())
- def testMapFusion(self, functions):
- dataset = dataset_ops.Dataset.range(5).apply(
- optimization.assert_next(["Map", "Prefetch"]))
- for function in functions:
- dataset = dataset.map(function)
-
- dataset = dataset.prefetch(0).apply(optimization.optimize(["map_fusion"]))
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
- with self.test_session() as sess:
- for x in range(5):
- result = sess.run(get_next)
- r = x
- for function in functions:
- if isinstance(r, tuple):
- r = function(*r) # Pass tuple as multiple arguments.
- else:
- r = function(r)
- self.assertAllEqual(r, result)
-
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
-
- @staticmethod
- def map_and_filter_functions():
- identity = lambda x: x
- increment = lambda x: x + 1
- minus_five = lambda x: x - 5
-
- def increment_and_square(x):
- y = x + 1
- return y * y
-
- take_all = lambda x: constant_op.constant(True)
- is_zero = lambda x: math_ops.equal(x, 0)
- is_odd = lambda x: math_ops.equal(x % 2, 0)
- greater = lambda x: math_ops.greater(x + 5, 0)
-
- functions = [identity, increment, minus_five, increment_and_square]
- filters = [take_all, is_zero, is_odd, greater]
- tests = []
-
- for x, fun in enumerate(functions):
- for y, predicate in enumerate(filters):
- tests.append(("mixed_{}_{}".format(x, y), fun, predicate))
-
- # Multi output
- tests.append(("multiOne", lambda x: (x, x),
- lambda x, y: constant_op.constant(True)))
- tests.append(
- ("multiTwo", lambda x: (x, 2),
- lambda x, y: math_ops.equal(x * math_ops.cast(y, dtypes.int64), 0)))
- return tuple(tests)
-
- @parameterized.named_parameters(*map_and_filter_functions.__func__())
- def testMapFilterFusion(self, function, predicate):
- dataset = dataset_ops.Dataset.range(10).apply(
- optimization.assert_next(
- ["Map",
- "FilterByLastComponent"])).map(function).filter(predicate).apply(
- optimization.optimize(["map_and_filter_fusion"]))
- self._testMapAndFilter(dataset, function, predicate)
-
- def _testMapAndFilter(self, dataset, function, predicate):
- iterator = dataset.make_one_shot_iterator()
- get_next = iterator.get_next()
- with self.test_session() as sess:
- for x in range(10):
- r = function(x)
- if isinstance(r, tuple):
- b = predicate(*r) # Pass tuple as multiple arguments.
- else:
- b = predicate(r)
- if sess.run(b):
- result = sess.run(get_next)
- self.assertAllEqual(r, result)
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
-
- def testAdditionalInputs(self):
- a = constant_op.constant(3, dtype=dtypes.int64)
- b = constant_op.constant(4, dtype=dtypes.int64)
- some_tensor = math_ops.mul(a, b)
- function = lambda x: x * x
-
- def predicate(y):
- return math_ops.less(math_ops.cast(y, dtypes.int64), some_tensor)
-
- # We are currently not supporting functions with additional inputs.
- dataset = dataset_ops.Dataset.range(10).apply(
- optimization.assert_next(
- ["Map", "Filter"])).map(function).filter(predicate).apply(
- optimization.optimize(["map_and_filter_fusion"]))
-
- self._testMapAndFilter(dataset, function, predicate)
-
-
-class OptimizeStatsDatasetTest(stats_dataset_test_base.StatsDatasetTestBase):
-
- def testLatencyStatsOptimization(self):
-
- stats_aggregator = stats_ops.StatsAggregator()
- dataset = dataset_ops.Dataset.from_tensors(1).apply(
- optimization.assert_next(
- ["LatencyStats", "Map", "LatencyStats", "Prefetch",
- "LatencyStats"])).map(lambda x: x * x).prefetch(1).apply(
- optimization.optimize(["latency_all_edges"])).apply(
- stats_ops.set_stats_aggregator(stats_aggregator))
- iterator = dataset.make_initializable_iterator()
- get_next = iterator.get_next()
- summary_t = stats_aggregator.get_summary()
-
- with self.test_session() as sess:
- sess.run(iterator.initializer)
- self.assertEqual(1 * 1, sess.run(get_next))
- with self.assertRaises(errors.OutOfRangeError):
- sess.run(get_next)
- summary_str = sess.run(summary_t)
- self._assertSummaryHasCount(summary_str,
- "record_latency_TensorDataset/_1", 1)
- self._assertSummaryHasCount(summary_str, "record_latency_MapDataset/_4",
- 1)
- self._assertSummaryHasCount(summary_str,
- "record_latency_PrefetchDataset/_6", 1)
-
-
-class MapVectorizationTest(test_utils.DatasetTestBase, parameterized.TestCase):
-
- def _get_test_datasets(self,
- base_dataset,
- map_fn,
- num_parallel_calls=None,
- expect_optimized=True):
- """Given base dataset and map fn, creates test datasets.
-
- Returns a tuple of (unoptimized, dataset, optimized dataset). The
- unoptimized dataset has the assertion that Batch follows Map. The optimized
- dataset has the assertion that Map follows Batch, and has the
- "map_vectorization" optimization applied.
-
- Args:
- base_dataset: Input dataset to map->batch
- map_fn: Map function to use
- num_parallel_calls: (Optional.) num_parallel_calls argument for map
- expect_optimized: (Optional.) Whether we expect the optimization to take
- place, in which case we will assert that Batch is followed by Map,
- otherwise Map followed by Batch. Defaults to True.
-
- Returns:
- Tuple of (unoptimized dataset, optimized dataset).
- """
- map_node_name = "Map" if num_parallel_calls is None else "ParallelMap"
- batch_size = 100
-
- def _make_dataset(node_names):
- return base_dataset.apply(optimization.assert_next(node_names)).map(
- map_fn, num_parallel_calls=num_parallel_calls).batch(batch_size)
-
- unoptimized = _make_dataset([map_node_name, "Batch"])
- optimized = _make_dataset(["Batch", map_node_name] if expect_optimized else
- [map_node_name, "Batch"]).apply(
- optimization.optimize(["map_vectorization"]))
-
- return unoptimized, optimized
-
- @parameterized.named_parameters(
- ("Basic", lambda x: (x, x + 1), None),
- ("Parallel", lambda x: (x, x + 1), 12),
- ("Gather", lambda x: array_ops.gather(x, 0), 12),
- )
- def testOptimization(self, map_fn, num_parallel_calls):
- base_dataset = dataset_ops.Dataset.from_tensor_slices([[1, 2],
- [3, 4]]).repeat(5)
- unoptimized, optimized = self._get_test_datasets(base_dataset, map_fn,
- num_parallel_calls)
- self._assert_datasets_equal(unoptimized, optimized)
-
- def testOptimizationBadMapFn(self):
- # Test map functions that give an error
- def map_fn(x):
- # x has leading dimension 5, this will raise an error
- return array_ops.gather(x, 10)
-
- base_dataset = dataset_ops.Dataset.range(5).repeat(5).batch(
- 5, drop_remainder=True)
- _, optimized = self._get_test_datasets(base_dataset, map_fn)
- nxt = optimized.make_one_shot_iterator().get_next()
- with self.assertRaisesRegexp(errors.InvalidArgumentError,
- r"indices = 10 is not in \[0, 5\)"):
- self.evaluate(nxt)
-
- def testOptimizationWithCapturedInputs(self):
- # Tests that vectorization works with captured inputs
- def map_fn(x):
- return x + y
-
- y = constant_op.constant(1, shape=(2,))
- base_dataset = dataset_ops.Dataset.from_tensor_slices([[1, 2],
- [3, 4]]).repeat(5)
- # TODO(rachelim): when this optimization works, turn on expect_optimized
- unoptimized, optimized = self._get_test_datasets(
- base_dataset, map_fn, expect_optimized=False)
- self._assert_datasets_equal(optimized, unoptimized)
-
- def testOptimizationIgnoreStateful(self):
-
- def map_fn(x):
- with ops.control_dependencies([check_ops.assert_equal(x, 0)]):
- return array_ops.identity(x)
-
- base_dataset = dataset_ops.Dataset.from_tensor_slices([[1, 2],
- [3, 4]]).repeat(5)
- _, optimized = self._get_test_datasets(
- base_dataset, map_fn, expect_optimized=False)
- nxt = optimized.make_one_shot_iterator().get_next()
-
- # NOTE: Right now, it raises an error because we can't save datasets that
- # are stateful, and we rely on this saving mechanism to optimize datasets,
- # so stateful functions can't be optimized.
- with self.assertRaisesRegexp(errors.InvalidArgumentError, "[Ss]tateful"):
- self.evaluate(nxt)
-
- def testOptimizationIgnoreRagged(self):
- # Make sure we ignore inputs that might not be uniformly sized
- def map_fn(x):
- return array_ops.gather(x, 0)
-
- # output_shape = (?,)
- base_dataset = dataset_ops.Dataset.range(20).batch(3, drop_remainder=False)
- unoptimized, optimized = self._get_test_datasets(
- base_dataset, map_fn, expect_optimized=False)
- self._assert_datasets_equal(unoptimized, optimized)
-
- def testOptimizationIgnoreRaggedMap(self):
- # Don't optimize when the output of the map fn shapes are unknown.
- def map_fn(x):
- return array_ops.tile(x, x)
-
- base_dataset = dataset_ops.Dataset.range(20).batch(1, drop_remainder=True)
- unoptimized, optimized = self._get_test_datasets(
- base_dataset, map_fn, expect_optimized=False)
- self._assert_datasets_raise_same_error(unoptimized, optimized,
- errors.InvalidArgumentError)
-
-
-if __name__ == "__main__":
- test.main()