aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.py
blob: 621911876fc502ece76b08eb6c28697b3c12c863 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License.  You may obtain a copy of
# the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# License for the specific language governing permissions and limitations under
# the License.
# ==============================================================================
"""Tests for KafkaDataset."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.contrib.kafka.python.ops import kafka_dataset_ops
from tensorflow.python.data.ops import iterator_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test


class KafkaDatasetTest(test.TestCase):

  def setUp(self):
    # The Kafka server has to be setup before the test
    # and tear down after the test manually.
    # The docker engine has to be installed.
    #
    # To setup the Kafka server:
    # $ bash kafka_test.sh start kafka
    #
    # To team down the Kafka server:
    # $ bash kafka_test.sh stop kafka
    pass

  def testKafkaDataset(self):
    topics = array_ops.placeholder(dtypes.string, shape=[None])
    num_epochs = array_ops.placeholder(dtypes.int64, shape=[])
    batch_size = array_ops.placeholder(dtypes.int64, shape=[])

    repeat_dataset = kafka_dataset_ops.KafkaDataset(
        topics, group="test", eof=True).repeat(num_epochs)
    batch_dataset = repeat_dataset.batch(batch_size)

    iterator = iterator_ops.Iterator.from_structure(batch_dataset.output_types)
    init_op = iterator.make_initializer(repeat_dataset)
    init_batch_op = iterator.make_initializer(batch_dataset)
    get_next = iterator.get_next()

    with self.test_session() as sess:
      # Basic test: read from topic 0.
      sess.run(init_op, feed_dict={topics: ["test:0:0:4"], num_epochs: 1})
      for i in range(5):
        self.assertEqual("D" + str(i), sess.run(get_next))
      with self.assertRaises(errors.OutOfRangeError):
        sess.run(get_next)

      # Basic test: read from topic 1.
      sess.run(init_op, feed_dict={topics: ["test:0:5:-1"], num_epochs: 1})
      for i in range(5):
        self.assertEqual("D" + str(i + 5), sess.run(get_next))
      with self.assertRaises(errors.OutOfRangeError):
        sess.run(get_next)

      # Basic test: read from both topics.
      sess.run(
          init_op,
          feed_dict={
              topics: ["test:0:0:4", "test:0:5:-1"],
              num_epochs: 1
          })
      for j in range(2):
        for i in range(5):
          self.assertEqual("D" + str(i + j * 5), sess.run(get_next))
      with self.assertRaises(errors.OutOfRangeError):
        sess.run(get_next)

      # Test repeated iteration through both files.
      sess.run(
          init_op,
          feed_dict={
              topics: ["test:0:0:4", "test:0:5:-1"],
              num_epochs: 10
          })
      for _ in range(10):
        for j in range(2):
          for i in range(5):
            self.assertEqual("D" + str(i + j * 5), sess.run(get_next))
      with self.assertRaises(errors.OutOfRangeError):
        sess.run(get_next)

      # Test batched and repeated iteration through both files.
      sess.run(
          init_batch_op,
          feed_dict={
              topics: ["test:0:0:4", "test:0:5:-1"],
              num_epochs: 10,
              batch_size: 5
          })
      for _ in range(10):
        self.assertAllEqual(["D" + str(i) for i in range(5)],
                            sess.run(get_next))
        self.assertAllEqual(["D" + str(i + 5) for i in range(5)],
                            sess.run(get_next))


if __name__ == "__main__":
  test.main()