aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py')
-rw-r--r--tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py101
1 files changed, 101 insertions, 0 deletions
diff --git a/tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py b/tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py
new file mode 100644
index 0000000000..b8f38e8a28
--- /dev/null
+++ b/tensorflow/python/data/experimental/kernel_tests/serialization/parallel_interleave_dataset_serialization_test.py
@@ -0,0 +1,101 @@
+# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+"""Tests for the ParallelInterleaveDataset serialization."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import numpy as np
+
+from tensorflow.python.data.experimental.kernel_tests.serialization import dataset_serialization_test_base
+from tensorflow.python.data.experimental.ops import interleave_ops
+from tensorflow.python.data.ops import dataset_ops
+from tensorflow.python.framework import sparse_tensor
+from tensorflow.python.ops import sparse_ops
+from tensorflow.python.platform import test
+
+
+class ParallelInterleaveDatasetSerializationTest(
+ dataset_serialization_test_base.DatasetSerializationTestBase):
+
+ def setUp(self):
+ self.input_values = np.array([4, 5, 6], dtype=np.int64)
+ self.num_repeats = 2
+ self.num_outputs = np.sum(self.input_values) * 2
+
+ def _build_ds(self, cycle_length, block_length, sloppy=False):
+ return (dataset_ops.Dataset.from_tensor_slices(
+ self.input_values).repeat(self.num_repeats).apply(
+ interleave_ops.parallel_interleave(
+ lambda x: dataset_ops.Dataset.range(10 * x, 11 * x),
+ cycle_length, block_length, sloppy)))
+
+ def testSerializationCore(self):
+ # cycle_length > 1, block_length > 1
+ cycle_length = 2
+ block_length = 3
+ self.run_core_tests(
+ lambda: self._build_ds(cycle_length, block_length),
+ lambda: self._build_ds(cycle_length * 2, block_length * 1),
+ self.num_outputs)
+ # cycle_length = 1
+ cycle_length = 1
+ block_length = 3
+ self.run_core_tests(lambda: self._build_ds(cycle_length, block_length),
+ None, self.num_outputs)
+ # block_length = 1
+ cycle_length = 2
+ block_length = 1
+ self.run_core_tests(lambda: self._build_ds(cycle_length, block_length),
+ None, self.num_outputs)
+
+ def testSerializationWithSloppy(self):
+ break_points = self.gen_break_points(self.num_outputs, 10)
+ expected_outputs = np.repeat(
+ np.concatenate([np.arange(10 * x, 11 * x) for x in self.input_values]),
+ self.num_repeats).tolist()
+
+ def run_test(cycle_length, block_length):
+ actual = self.gen_outputs(
+ lambda: self._build_ds(cycle_length, block_length, True),
+ break_points, self.num_outputs)
+ self.assertSequenceEqual(sorted(actual), expected_outputs)
+
+ # cycle_length > 1, block_length > 1
+ run_test(2, 3)
+ # cycle_length = 1
+ run_test(1, 3)
+ # block_length = 1
+ run_test(2, 1)
+
+ def testSparseCore(self):
+
+ def _map_fn(i):
+ return sparse_tensor.SparseTensorValue(
+ indices=[[0, 0], [1, 1]], values=(i * [1, -1]), dense_shape=[2, 2])
+
+ def _interleave_fn(x):
+ return dataset_ops.Dataset.from_tensor_slices(
+ sparse_ops.sparse_to_dense(x.indices, x.dense_shape, x.values))
+
+ def _build_dataset():
+ return dataset_ops.Dataset.range(10).map(_map_fn).apply(
+ interleave_ops.parallel_interleave(_interleave_fn, 1))
+
+ self.run_core_tests(_build_dataset, None, 20)
+
+
+if __name__ == '__main__':
+ test.main()