diff options
Diffstat (limited to 'tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py')
-rw-r--r-- | tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py | 280 |
1 files changed, 213 insertions, 67 deletions
diff --git a/tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py b/tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py index d444c4082e..5ead6d1c75 100644 --- a/tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py +++ b/tensorflow/python/data/experimental/kernel_tests/map_and_batch_test.py @@ -30,6 +30,7 @@ from tensorflow.python.framework import dtypes from tensorflow.python.framework import errors from tensorflow.python.framework import sparse_tensor from tensorflow.python.ops import array_ops +from tensorflow.python.ops import control_flow_ops from tensorflow.python.ops import math_ops from tensorflow.python.ops import script_ops from tensorflow.python.platform import test @@ -38,12 +39,17 @@ from tensorflow.python.platform import test class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): @parameterized.named_parameters( - ("Default", None, None), - ("SequentialCalls", 1, None), - ("ParallelCalls", 2, None), - ("ParallelBatches", None, 10), + ("Default", None, None, False), + ("SequentialCalls", 1, None, False), + ("ParallelCalls", 2, None, False), + ("ParallelBatches", None, 10, False), + ("DefaultNUMA", None, None, True), + ("SequentialCallsNUMA", 1, None, True), + ("ParallelCallsNUMA", 2, None, True), + ("ParallelBatchesNUMA", None, 10, True), ) - def testMapAndBatch(self, num_parallel_calls, num_parallel_batches): + def testMapAndBatch(self, num_parallel_calls, num_parallel_batches, + numa_aware): """Test a dataset that maps a TF function across its input elements.""" # The pipeline is TensorSliceDataset -> # RepeatDataset(count) -> MapAndBatchDataset(square_3, batch_size). @@ -57,14 +63,20 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): def _map_fn(x, y, z): return math_ops.square(x), math_ops.square(y), math_ops.square(z) - iterator = ( + dataset = ( dataset_ops.Dataset.from_tensor_slices(components).repeat(count).apply( batching.map_and_batch( map_func=_map_fn, batch_size=batch_size, num_parallel_calls=num_parallel_calls, - num_parallel_batches=num_parallel_batches)) - .make_initializable_iterator()) + num_parallel_batches=num_parallel_batches))) + + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + + iterator = dataset.make_initializable_iterator() init_op = iterator.initializer get_next = iterator.get_next() @@ -115,16 +127,25 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): sess.run(init_op, feed_dict={count: 14, batch_size: 0}) @parameterized.named_parameters( - ("Even", False), - ("Uneven", True), + ("Even", False, False), + ("Uneven", True, False), + ("EvenNUMA", False, True), + ("UnevenNUMA", True, True), ) - def testMapAndBatchPartialBatch(self, drop_remainder): - iterator = ( + def testMapAndBatchPartialBatch(self, drop_remainder, numa_aware): + dataset = ( dataset_ops.Dataset.range(10).apply( batching.map_and_batch( lambda x: array_ops.reshape(x * x, [1]), batch_size=4, - drop_remainder=drop_remainder)).make_one_shot_iterator()) + drop_remainder=drop_remainder))) + + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_one_shot_iterator() + if drop_remainder: self.assertEqual([4, 1], iterator.output_shapes.as_list()) else: @@ -138,11 +159,21 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): with self.assertRaises(errors.OutOfRangeError): sess.run(next_element) - def testMapAndBatchYieldsPartialBatch(self): - iterator = (dataset_ops.Dataset.range(10) - .apply(batching.map_and_batch( - lambda x: array_ops.reshape(x * x, [1]), 4)) - .make_one_shot_iterator()) + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchYieldsPartialBatch(self, numa_aware): + dataset = ( + dataset_ops.Dataset.range(10).apply( + batching.map_and_batch(lambda x: array_ops.reshape(x * x, [1]), 4))) + + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + + iterator = dataset.make_one_shot_iterator() self.assertEqual([None, 1], iterator.output_shapes.as_list()) next_element = iterator.get_next() with self.cached_session() as sess: @@ -152,10 +183,19 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): with self.assertRaises(errors.OutOfRangeError): sess.run(next_element) - def testMapAndBatchParallelGetNext(self): - iterator = (dataset_ops.Dataset.range(50000) - .apply(batching.map_and_batch(lambda x: x, batch_size=100)) - .make_one_shot_iterator()) + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchParallelGetNext(self, numa_aware): + dataset = dataset_ops.Dataset.range(50000).apply( + batching.map_and_batch(lambda x: x, batch_size=100)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_one_shot_iterator() + elements = [] for _ in range(100): elements.append(iterator.get_next()) @@ -165,17 +205,26 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): got.sort(key=lambda x: x[0]) expected = [] for j in range(100): - expected.append(range(i*10000+j*100, i*10000+(j+1)*100)) + expected.append(range(i * 10000 + j * 100, i * 10000 + (j + 1) * 100)) self.assertAllEqual(got, expected) with self.assertRaises(errors.OutOfRangeError): sess.run(elements) - def testMapAndBatchParallelGetNextDropRemainder(self): - iterator = ( - dataset_ops.Dataset.range(49999).apply( - batching.map_and_batch( - lambda x: x, batch_size=100, drop_remainder=True)) - .make_one_shot_iterator()) + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchParallelGetNextDropRemainder(self, numa_aware): + dataset = dataset_ops.Dataset.range(49999).apply( + batching.map_and_batch( + lambda x: x, batch_size=100, drop_remainder=True)) + + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_one_shot_iterator() + elements = [] for _ in range(100): elements.append(iterator.get_next()) @@ -185,19 +234,29 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): got.sort(key=lambda x: x[0]) expected = [] for j in range(100): - expected.append(range(i*10000+j*100, i*10000+(j+1)*100)) + expected.append(range(i * 10000 + j * 100, i * 10000 + (j + 1) * 100)) self.assertAllEqual(got, expected) with self.assertRaises(errors.OutOfRangeError): sess.run(elements) - def testMapAndBatchSparse(self): + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchSparse(self, numa_aware): def _sparse(i): return sparse_tensor.SparseTensorValue( indices=[[0]], values=(i * [1]), dense_shape=[1]) - iterator = dataset_ops.Dataset.range(10).apply( - batching.map_and_batch(_sparse, 5)).make_initializable_iterator() + dataset = dataset_ops.Dataset.range(10).apply( + batching.map_and_batch(_sparse, 5)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_initializable_iterator() + init_op = iterator.initializer get_next = iterator.get_next() @@ -214,21 +273,33 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): with self.assertRaises(errors.OutOfRangeError): sess.run(get_next) - def testMapAndBatchFails(self): + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchFails(self, numa_aware): """Test a dataset that maps a TF function across its input elements.""" dataset = dataset_ops.Dataset.from_tensors( array_ops.check_numerics( constant_op.constant(1.0) / constant_op.constant(0.0), "oops")) batch_size = array_ops.placeholder(dtypes.int64, shape=[]) - iterator = ( - dataset.apply(batching.map_and_batch(lambda x: x, batch_size)) - .make_initializable_iterator()) + dataset = dataset.apply(batching.map_and_batch(lambda x: x, batch_size)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_initializable_iterator() + init_op = iterator.initializer with self.cached_session() as sess: with self.assertRaisesRegexp(errors.InvalidArgumentError, "oops"): sess.run(init_op, feed_dict={batch_size: 14}) - def testMapAndBatchShapeMismatch(self): + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchShapeMismatch(self, numa_aware): """Test a dataset that maps a TF function across its input elements.""" def generator(): @@ -240,9 +311,13 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): dataset = dataset_ops.Dataset.from_generator( generator, output_types=dtypes.int32) batch_size = 4 - iterator = ( - dataset.apply(batching.map_and_batch(lambda x: x, batch_size)) - .make_initializable_iterator()) + dataset = dataset.apply(batching.map_and_batch(lambda x: x, batch_size)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_initializable_iterator() + init_op = iterator.initializer get_next = iterator.get_next() with self.cached_session() as sess: @@ -251,7 +326,11 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): "number of elements does not match"): sess.run(get_next) - def testMapAndBatchImplicitDispose(self): + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchImplicitDispose(self, numa_aware): # Tests whether a map and batch dataset will be cleaned up correctly when # the pipeline does not run it until exhaustion. # The pipeline is TensorSliceDataset -> RepeatDataset(1000) -> @@ -266,6 +345,10 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): dataset = dataset_ops.Dataset.from_tensor_slices(components).repeat( 1000).apply(batching.map_and_batch(_map_fn, batch_size=100)) dataset = dataset.prefetch(5) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) iterator = dataset.make_one_shot_iterator() get_next = iterator.get_next() @@ -274,26 +357,38 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): sess.run(get_next) @parameterized.named_parameters( - ("1", 0), - ("2", 5), - ("3", 10), - ("4", 90), - ("5", 95), - ("6", 99), + ("1", 0, False), + ("2", 5, False), + ("3", 10, False), + ("4", 90, False), + ("5", 95, False), + ("6", 99, False), + ("1NUMA", 0, True), + ("2NUMA", 5, True), + ("3NUMA", 10, True), + ("4NUMA", 90, True), + ("5NUMA", 95, True), + ("6NUMA", 99, True), ) - def testMapAndBatchOutOfRangeError(self, threshold): + def testMapAndBatchOutOfRangeError(self, threshold, numa_aware): def raising_py_fn(i): - if i >= threshold: + if i == threshold: raise StopIteration() + elif i > threshold: + raise RuntimeError("Alternate error; you shouldn't see me! (i: %s)" % i) else: return i - iterator = ( - dataset_ops.Dataset.range(100).apply( - batching.map_and_batch( - lambda x: script_ops.py_func(raising_py_fn, [x], dtypes.int64), - batch_size=10)).make_one_shot_iterator()) + dataset = dataset_ops.Dataset.range(100).apply( + batching.map_and_batch( + lambda x: script_ops.py_func(raising_py_fn, [x], dtypes.int64), + batch_size=10)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_one_shot_iterator() get_next = iterator.get_next() with self.cached_session() as sess: @@ -307,25 +402,42 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): sess.run(get_next) @parameterized.named_parameters( - ("1", False, dtypes.bool), - ("2", -42, dtypes.int8), - ("3", -42, dtypes.int16), - ("4", -42, dtypes.int32), - ("5", -42, dtypes.int64), - ("6", 42, dtypes.uint8), - ("7", 42, dtypes.uint16), - ("8", 42.0, dtypes.float16), - ("9", 42.0, dtypes.float32), - ("10", 42.0, dtypes.float64), - ("11", b"hello", dtypes.string), + ("1", False, dtypes.bool, False), + ("2", -42, dtypes.int8, False), + ("3", -42, dtypes.int16, False), + ("4", -42, dtypes.int32, False), + ("5", -42, dtypes.int64, False), + ("6", 42, dtypes.uint8, False), + ("7", 42, dtypes.uint16, False), + ("8", 42.0, dtypes.float16, False), + ("9", 42.0, dtypes.float32, False), + ("10", 42.0, dtypes.float64, False), + ("11", b"hello", dtypes.string, False), + ("1NUMA", False, dtypes.bool, True), + ("2NUMA", -42, dtypes.int8, True), + ("3NUMA", -42, dtypes.int16, True), + ("4NUMA", -42, dtypes.int32, True), + ("5NUMA", -42, dtypes.int64, True), + ("6NUMA", 42, dtypes.uint8, True), + ("7NUMA", 42, dtypes.uint16, True), + ("8NUMA", 42.0, dtypes.float16, True), + ("9NUMA", 42.0, dtypes.float32, True), + ("10NUMA", 42.0, dtypes.float64, True), + ("11NUMA", b"hello", dtypes.string, True), ) - def testMapAndBatchTypes(self, element, dtype): + def testMapAndBatchTypes(self, element, dtype, numa_aware): + def gen(): yield element dataset = dataset_ops.Dataset.from_generator(gen, dtype).repeat(100).apply( batching.map_and_batch(lambda x: x, batch_size=10)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + get_next = dataset.make_one_shot_iterator().get_next() with self.cached_session() as sess: @@ -363,6 +475,40 @@ class MapAndBatchTest(test_base.DatasetTestBase, parameterized.TestCase): sess.run(iterator.initializer, feed_dict={captured_t: 42}) self.assertAllEqual([42] * 10, sess.run(get_next)) + @parameterized.named_parameters( + ("Normal", False), + ("NUMA", True), + ) + def testMapAndBatchControlFlow(self, numa_aware): + + def map_fn(x): + previous_cond_v2_value = control_flow_ops.ENABLE_COND_V2 + control_flow_ops.ENABLE_COND_V2 = True + return_value = control_flow_ops.cond(x < 50, lambda: x + 1, lambda: x * x) + control_flow_ops.ENABLE_COND_V2 = previous_cond_v2_value + return return_value + + dataset = dataset_ops.Dataset.range(100).apply( + batching.map_and_batch(map_fn, batch_size=10)) + if numa_aware: + options = dataset_ops.Options() + options.experimental_numa_aware = True + dataset = dataset.with_options(options) + iterator = dataset.make_one_shot_iterator() + get_next = iterator.get_next() + with self.cached_session() as sess: + for i in range(10): + print("Case %d" % i) + if i < 5: + self.assertAllEqual([i * 10 + j + 1 for j in range(10)], + sess.run(get_next)) + else: + self.assertAllEqual( + [((i * 10) + j) * ((i * 10) + j) for j in range(10)], + sess.run(get_next)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(get_next) + if __name__ == "__main__": test.main() |