aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py')
-rw-r--r--tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py b/tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py
new file mode 100644
index 0000000000..ca2df95ba4
--- /dev/null
+++ b/tensorflow/contrib/kinesis/python/ops/kinesis_dataset_ops.py
@@ -0,0 +1,96 @@
+# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+"""Kinesis Dataset."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from tensorflow.contrib.kinesis.python.ops import kinesis_op_loader # pylint: disable=unused-import
+from tensorflow.contrib.kinesis.python.ops import gen_dataset_ops
+from tensorflow.python.data.ops.dataset_ops import Dataset
+from tensorflow.python.framework import dtypes
+from tensorflow.python.framework import ops
+from tensorflow.python.framework import tensor_shape
+
+
+class KinesisDataset(Dataset):
+ """A Kinesis Dataset that consumes the message.
+
+ Kinesis is a managed service provided by AWS for data streaming.
+ This dataset reads messages from Kinesis with each message presented
+ as a `tf.string`.
+
+ For example, we can construct and use the KinesisDataset as follows:
+ ```python
+ dataset = tf.contrib.kinesis.KinesisDataset(
+ "kinesis_stream_name", read_indefinitely=False)
+ next = dataset.make_one_shot_iterator().get_next()
+ with tf.Session() as sess:
+ while True:
+ try:
+ print(sess.run(nxt))
+ except tf.errors.OutOfRangeError:
+ break
+ ```
+
+ Since Kinesis is a data streaming service, data may not be available
+ at the time it is being read. The argument `read_indefinitely` is
+ used to control the behavior in this situation. If `read_indefinitely`
+ is `True`, then `KinesisDataset` will keep retrying to retrieve data
+ from the stream. If `read_indefinitely` is `False`, an `OutOfRangeError`
+ is returned immediately instead.
+ """
+
+ def __init__(self,
+ stream,
+ shard="",
+ read_indefinitely=True,
+ interval=100000):
+ """Create a KinesisDataset.
+
+ Args:
+ stream: A `tf.string` tensor containing the name of the stream.
+ shard: A `tf.string` tensor containing the id of the shard.
+ read_indefinitely: If `True`, the Kinesis dataset will keep retry
+ again on `EOF` after the `interval` period. If `False`, then
+ the dataset will stop on `EOF`. The default value is `True`.
+ interval: The interval for the Kinesis Client to wait before
+ it tries to get records again (in millisecond).
+ """
+ super(KinesisDataset, self).__init__()
+ self._stream = ops.convert_to_tensor(
+ stream, dtype=dtypes.string, name="stream")
+ self._shard = ops.convert_to_tensor(
+ shard, dtype=dtypes.string, name="shard")
+ self._read_indefinitely = ops.convert_to_tensor(
+ read_indefinitely, dtype=dtypes.bool, name="read_indefinitely")
+ self._interval = ops.convert_to_tensor(
+ interval, dtype=dtypes.int64, name="interval")
+
+ def _as_variant_tensor(self):
+ return gen_dataset_ops.kinesis_dataset(
+ self._stream, self._shard, self._read_indefinitely, self._interval)
+
+ @property
+ def output_classes(self):
+ return ops.Tensor
+
+ @property
+ def output_shapes(self):
+ return tensor_shape.scalar()
+
+ @property
+ def output_types(self):
+ return dtypes.string