diff options
author | Jiri Simsa <jsimsa@google.com> | 2018-09-11 17:50:51 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-09-11 17:55:22 -0700 |
commit | 683cf4eb603defd7b55a83bbe0e0f335d7ab6354 (patch) | |
tree | c6c7894c51861922d478fc7b57343535d714c778 /tensorflow/contrib/data | |
parent | d77ec7f18fe9f4b03f7259a0003b966b6be28d03 (diff) |
[tf.data] Mechanism for collecting processing time information and modeling performance.
PiperOrigin-RevId: 212557406
Diffstat (limited to 'tensorflow/contrib/data')
3 files changed, 237 insertions, 0 deletions
diff --git a/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD b/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD index 459bdf66f3..7e9ea68047 100644 --- a/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD +++ b/tensorflow/contrib/data/python/kernel_tests/optimization/BUILD @@ -74,6 +74,25 @@ py_test( ) py_test( + name = "model_dataset_op_test", + size = "medium", + srcs = ["model_dataset_op_test.py"], + srcs_version = "PY2AND3", + tags = [ + "optonly", + ], + deps = [ + "//tensorflow/contrib/data/python/ops:batching", + "//tensorflow/contrib/data/python/ops:interleave_ops", + "//tensorflow/contrib/data/python/ops:optimization", + "//tensorflow/python:client_testlib", + "//tensorflow/python:errors", + "//tensorflow/python/data/ops:dataset_ops", + "//third_party/py/numpy", + ], +) + +py_test( name = "optimize_dataset_op_test", size = "small", srcs = ["optimize_dataset_op_test.py"], diff --git a/tensorflow/contrib/data/python/kernel_tests/optimization/model_dataset_op_test.py b/tensorflow/contrib/data/python/kernel_tests/optimization/model_dataset_op_test.py new file mode 100644 index 0000000000..0a87d3e905 --- /dev/null +++ b/tensorflow/contrib/data/python/kernel_tests/optimization/model_dataset_op_test.py @@ -0,0 +1,177 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for the experimental input pipeline ops.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import numpy as np + +from tensorflow.contrib.data.python.ops import batching +from tensorflow.contrib.data.python.ops import optimization +from tensorflow.python.data.ops import dataset_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.platform import test + + +class ModelDatasetTest(test.TestCase): + + def testModelMap(self): + k = 1024 * 1024 + dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k), + np.random.rand(4 * k, + 1))).repeat() + dataset = dataset.map(math_ops.matmul) + iterator = dataset.apply(optimization.model()).make_one_shot_iterator() + get_next = iterator.get_next() + + deltas = [] + with self.test_session() as sess: + for _ in range(5): + sess.run(get_next.op) + for _ in range(100): + start = time.time() + sess.run(get_next.op) + end = time.time() + deltas.append(end - start) + + print("%f (median), %f (mean), %f (stddev), %f (min), %f (max)\n" % + (np.median(deltas), np.mean(deltas), np.std(deltas), np.min(deltas), + np.max(deltas))) + + def testModelParallelMap(self): + k = 1024 * 1024 + dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k), + np.random.rand(4 * k, + 1))).repeat() + dataset = dataset.map(math_ops.matmul, num_parallel_calls=56) + iterator = dataset.apply(optimization.model()).make_one_shot_iterator() + get_next = iterator.get_next() + + deltas = [] + with self.test_session() as sess: + for _ in range(5): + sess.run(get_next.op) + for _ in range(1000): + start = time.time() + sess.run(get_next.op) + end = time.time() + deltas.append(end - start) + + print("%f (median), %f (mean), %f (stddev), %f (min), %f (max)\n" % + (np.median(deltas), np.mean(deltas), np.std(deltas), np.min(deltas), + np.max(deltas))) + + def testModelMapAndBatch(self): + batch_size = 16 + k = 1024 * 1024 + dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k), + np.random.rand(4 * k, + 1))).repeat() + dataset = dataset.apply( + batching.map_and_batch( + math_ops.matmul, num_parallel_calls=28, batch_size=batch_size)) + iterator = dataset.apply(optimization.model()).make_one_shot_iterator() + get_next = iterator.get_next() + + deltas = [] + with self.test_session() as sess: + for _ in range(5): + sess.run(get_next.op) + for _ in range(10): + start = time.time() + sess.run(get_next.op) + end = time.time() + deltas.append(end - start) + + print("%f (median), %f (mean), %f (stddev), %f (min), %f (max)\n" % + (np.median(deltas), np.mean(deltas), np.std(deltas), np.min(deltas), + np.max(deltas))) + + def testModelParallelInterleave(self): + k = 1024 * 1024 + dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k), + np.random.rand(4 * k, + 1))).repeat() + dataset = dataset.map(math_ops.matmul) + dataset = dataset_ops.Dataset.range(1).repeat().interleave( + lambda _: dataset, cycle_length=56, num_parallel_calls=56) + iterator = dataset.apply(optimization.model()).make_one_shot_iterator() + get_next = iterator.get_next() + + deltas = [] + with self.test_session() as sess: + for _ in range(5): + sess.run(get_next.op) + for _ in range(1000): + start = time.time() + sess.run(get_next.op) + end = time.time() + deltas.append(end - start) + + print("%f (median), %f (mean), %f (stddev), %f (min), %f (max)\n" % + (np.median(deltas), np.mean(deltas), np.std(deltas), np.min(deltas), + np.max(deltas))) + + def testModelNested(self): + k = 1024 * 1024 + a = (np.random.rand(1, 8 * k), np.random.rand(8 * k, 1)) + b = (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1)) + c = (np.random.rand(1, 2 * k), np.random.rand(2 * k, 1)) + dataset = dataset_ops.Dataset.from_tensors((a, b, c)).repeat() + + def f1(a, b, c): + x, y = a + return math_ops.matmul(x, y), b, c + + def f2(a, b, c): + x, y = b + return a, math_ops.matmul(x, y), c + + def f3(a, b, c): + x, y = c + return a, b, math_ops.matmul(x, y) + + dataset = dataset.map(f1, num_parallel_calls=32) + dataset = dataset_ops.Dataset.range(1).repeat().interleave( + lambda _: dataset, cycle_length=2) + + dataset = dataset.map(f2, num_parallel_calls=16) + dataset = dataset_ops.Dataset.range(1).repeat().interleave( + lambda _: dataset, cycle_length=2) + + dataset = dataset.map(f3, num_parallel_calls=10) + iterator = dataset.apply(optimization.model()).make_one_shot_iterator() + get_next = iterator.get_next() + + deltas = [] + with self.test_session() as sess: + for _ in range(5): + sess.run(get_next) + for _ in range(100): + start = time.time() + sess.run(get_next) + end = time.time() + deltas.append(end - start) + + print("%f (median), %f (mean), %f (stddev), %f (min), %f (max)\n" % + (np.median(deltas), np.mean(deltas), np.std(deltas), np.min(deltas), + np.max(deltas))) + + +if __name__ == "__main__": + test.main() diff --git a/tensorflow/contrib/data/python/ops/optimization.py b/tensorflow/contrib/data/python/ops/optimization.py index fa1b851ad7..4114b62e29 100644 --- a/tensorflow/contrib/data/python/ops/optimization.py +++ b/tensorflow/contrib/data/python/ops/optimization.py @@ -46,6 +46,21 @@ def assert_next(transformations): return _apply_fn +def model(): + """A transformation that models performance. + + Returns: + A `Dataset` transformation function, which can be passed to + @{tf.data.Dataset.apply}. + """ + + def _apply_fn(dataset): + """Function from `Dataset` to `Dataset` that applies the transformation.""" + return _ModelDataset(dataset) + + return _apply_fn + + def optimize(optimizations=None): """A transformation that applies optimizations. @@ -97,6 +112,32 @@ class _AssertNextDataset(dataset_ops.Dataset): return self._input_dataset.output_types +class _ModelDataset(dataset_ops.Dataset): + """A `Dataset` that acts as an identity, and models performance.""" + + def __init__(self, input_dataset): + """See `optimize()` for details.""" + super(_ModelDataset, self).__init__() + self._input_dataset = input_dataset + + def _as_variant_tensor(self): + return gen_dataset_ops.model_dataset( + self._input_dataset._as_variant_tensor(), # pylint: disable=protected-access + **dataset_ops.flat_structure(self)) + + @property + def output_classes(self): + return self._input_dataset.output_classes + + @property + def output_shapes(self): + return self._input_dataset.output_shapes + + @property + def output_types(self): + return self._input_dataset.output_types + + class _OptimizeDataset(dataset_ops.Dataset): """A `Dataset` that acts as an identity, and applies optimizations.""" |