aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/data/python/kernel_tests/iterator_ops_test.py
blob: 77148aceec7fa90f927a9c009671c2939460877b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for experimental iterator_ops."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.contrib.data.python.ops import iterator_ops
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.estimator import estimator
from tensorflow.python.estimator import model_fn
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.training import checkpoint_management
from tensorflow.python.training import saver as saver_lib
from tensorflow.python.training import training_util


class CheckpointInputPipelineHookTest(test.TestCase):

  @staticmethod
  def _model_fn(features, labels, mode, config):
    del labels
    del mode
    del config
    global_step = training_util.get_or_create_global_step()
    update_global_step_op = global_step.assign_add(1)
    latest_feature = variables.Variable(
        0, name='latest_feature', dtype=dtypes.int64)
    store_latest_feature_op = latest_feature.assign(features)
    ops.add_to_collection('my_vars', global_step)
    ops.add_to_collection('my_vars', latest_feature)
    return model_fn.EstimatorSpec(
        mode='train',
        train_op=control_flow_ops.group(
            [update_global_step_op, store_latest_feature_op]),
        loss=constant_op.constant(2.0))

  def _read_vars(self, model_dir):
    """Returns (global_step, latest_feature)."""
    with ops.Graph().as_default() as g:
      ckpt_path = checkpoint_management.latest_checkpoint(model_dir)
      meta_filename = ckpt_path + '.meta'
      saver_lib.import_meta_graph(meta_filename)
      saver = saver_lib.Saver()
      with self.test_session(graph=g) as sess:
        saver.restore(sess, ckpt_path)
        return sess.run(ops.get_collection('my_vars'))

  def _build_iterator_saver_hook(self, est):
    return iterator_ops.CheckpointInputPipelineHook(est)

  def testReturnDatasetFromInputFn(self):

    def _input_fn():
      return dataset_ops.Dataset.range(10)

    est = estimator.Estimator(model_fn=self._model_fn)

    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (2, 1))
    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (4, 3))

  def testBuildIteratorInInputFn(self):

    def _input_fn():
      ds = dataset_ops.Dataset.range(10)
      iterator = ds.make_one_shot_iterator()
      return iterator.get_next()

    est = estimator.Estimator(model_fn=self._model_fn)

    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (2, 1))
    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (4, 3))

  def testDoNotRestore(self):

    def _input_fn():
      return dataset_ops.Dataset.range(10)

    est = estimator.Estimator(model_fn=self._model_fn)

    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (2, 1))
    est.train(_input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])
    self.assertSequenceEqual(self._read_vars(est.model_dir), (4, 3))
    # Hook not provided, input pipeline was not restored.
    est.train(_input_fn, steps=2)
    self.assertSequenceEqual(self._read_vars(est.model_dir), (6, 1))

  def testRaiseErrorIfNoIterator(self):

    def _input_fn():
      return constant_op.constant(1, dtype=dtypes.int64)

    est = estimator.Estimator(model_fn=self._model_fn)

    with self.assertRaises(ValueError):
      est.train(
          _input_fn, steps=2, hooks=[self._build_iterator_saver_hook(est)])


if __name__ == '__main__':
  test.main()