aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/data
diff options
context:
space:
mode:
authorGravatar Piotr Padlewski <prazek@google.com>2018-09-14 11:28:28 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-09-14 11:32:52 -0700
commitc20a7b81d79d30db9e990309ddb419bcb48120cc (patch)
tree9ea682cf79bac18653e7690785e0f5e7117b6b8b /tensorflow/contrib/data
parent89f9080ed0d1a43cb2fa253997b2553c6916f364 (diff)
[tf.data] Introducing an optimization that parallelizes map transformations.
Stateless MapDatasets can be paralellized by switching to ParallelMapDataset. We set `num_parallel_calls` to 2 for now, but in the future a special value will be used that result in the optimal value to be selected dynamically at runtime. This patch also exposed a memory leak which was fixed. PiperOrigin-RevId: 213015223
Diffstat (limited to 'tensorflow/contrib/data')
-rw-r--r--tensorflow/contrib/data/python/kernel_tests/optimization/BUILD17
-rw-r--r--tensorflow/contrib/data/python/kernel_tests/optimization/map_parallelization_test.py84
2 files changed, 101 insertions, 0 deletions
diff --git a/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD b/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD
index 7e9ea68047..b3187bf61b 100644
--- a/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD
+++ b/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD
@@ -74,6 +74,23 @@ py_test(
)
py_test(
+ name = "map_parallelization_test",
+ size = "small",
+ srcs = ["map_parallelization_test.py"],
+ srcs_version = "PY2AND3",
+ deps = [
+ "//tensorflow/contrib/data/python/ops:optimization",
+ "//tensorflow/python:client_testlib",
+ "//tensorflow/python:constant_op",
+ "//tensorflow/python:dtypes",
+ "//tensorflow/python:errors",
+ "//tensorflow/python:math_ops",
+ "//tensorflow/python/data/ops:dataset_ops",
+ "@absl_py//absl/testing:parameterized",
+ ],
+)
+
+py_test(
name = "model_dataset_op_test",
size = "medium",
srcs = ["model_dataset_op_test.py"],
diff --git a/tensorflow/contrib/data/python/kernel_tests/optimization/map_parallelization_test.py b/tensorflow/contrib/data/python/kernel_tests/optimization/map_parallelization_test.py
new file mode 100644
index 0000000000..dd547db086
--- /dev/null
+++ b/tensorflow/contrib/data/python/kernel_tests/optimization/map_parallelization_test.py
@@ -0,0 +1,84 @@
+# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+"""Tests for the MapParallelization optimization."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from absl.testing import parameterized
+
+from tensorflow.contrib.data.python.ops import optimization
+from tensorflow.python.data.ops import dataset_ops
+from tensorflow.python.framework import dtypes
+from tensorflow.python.framework import errors
+from tensorflow.python.framework import ops
+from tensorflow.python.ops import control_flow_ops
+from tensorflow.python.ops import math_ops
+from tensorflow.python.ops import random_ops
+from tensorflow.python.platform import test
+
+
+class MapParallelizationTest(test.TestCase, parameterized.TestCase):
+
+ @staticmethod
+ def map_functions():
+ identity = lambda x: x
+ increment = lambda x: x + 1
+
+ def assert_greater(x):
+ assert_op = control_flow_ops.Assert(math_ops.greater(x, -1), [x])
+ with ops.control_dependencies([assert_op]):
+ return x
+
+ def random(_):
+ return random_ops.random_uniform([],
+ minval=0,
+ maxval=10,
+ dtype=dtypes.int64,
+ seed=42)
+
+ def assert_with_random(x):
+ x = assert_greater(x)
+ return random(x)
+
+ return (("Identity", identity, True), ("Increment", increment, True),
+ ("AssertGreater", assert_greater, True), ("Random", random, False),
+ ("AssertWithRandom", assert_with_random, False))
+
+ @parameterized.named_parameters(*map_functions.__func__())
+ def testMapParallelization(self, function, should_optimize):
+ next_nodes = ["ParallelMap"] if should_optimize else ["Map"]
+ dataset = dataset_ops.Dataset.range(5).apply(
+ optimization.assert_next(next_nodes)).map(function).apply(
+ optimization.optimize(["map_parallelization"]))
+ iterator = dataset.make_one_shot_iterator()
+ get_next = iterator.get_next()
+
+ with self.test_session() as sess:
+ for x in range(5):
+ result = sess.run(get_next)
+ # No need to run the pipeline if it was not optimized. Also the results
+ # might be hard to check because of random.
+ if not should_optimize:
+ return
+ r = function(x)
+ self.assertAllEqual(r, result)
+
+ with self.assertRaises(errors.OutOfRangeError):
+ sess.run(get_next)
+
+
+if __name__ == "__main__":
+ test.main()