aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/training/input.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/python/training/input.py')
-rw-r--r--tensorflow/python/training/input.py501
1 files changed, 501 insertions, 0 deletions
diff --git a/tensorflow/python/training/input.py b/tensorflow/python/training/input.py
new file mode 100644
index 0000000000..413fc044f7
--- /dev/null
+++ b/tensorflow/python/training/input.py
@@ -0,0 +1,501 @@
+"""## Input pipeline
+
+TensorFlow functions for setting up an input-prefetching pipeline.
+Please see the [reading data how-to](../../how_tos/reading_data.md)
+for context.
+
+### Beginning of an input pipeline
+
+The "producer" functions add a queue to the graph and a corresponding
+QueueRunner for running the subgraph that fills that queue.
+
+@@match_filenames_once
+@@limit_epochs
+@@range_input_producer
+@@slice_input_producer
+@@string_input_producer
+
+### Batching at the end of an input pipeline
+
+These functions add a queue to the graph to assemble a batch of
+examples, with possible shuffling. They also add a QueueRunner for
+running the subgraph that fills that queue.
+
+Use [batch](#batch) or [batch_join](#batch_join) for batching examples that have
+already been well shuffled. Use [shuffle_batch](#shuffle_batch) or
+[shuffle_batch_join](#shuffle_batch_join) for examples that
+would benefit from additional shuffling.
+
+Use [batch](#batch) or [shuffle_batch](#shuffle_batch) if you want a
+single thread producing examples to batch, or if you have a
+single subgraph producing examples but you want to run it in N threads
+(where you increase N until it can keep the queue full). Use
+[batch_join](#batch_join) or [shuffle_batch_join](#shuffle_batch_join)
+if you have N different subgraphs producing examples to batch and you
+want them run by N threads.
+
+@@batch
+@@batch_join
+@@shuffle_batch
+@@shuffle_batch_join
+
+"""
+
+from tensorflow.python.framework import ops
+from tensorflow.python.framework import tensor_shape
+from tensorflow.python.framework import types
+from tensorflow.python.ops import array_ops
+from tensorflow.python.ops import math_ops
+from tensorflow.python.ops import constant_op
+from tensorflow.python.ops import data_flow_ops
+from tensorflow.python.ops import io_ops
+from tensorflow.python.ops import math_ops
+from tensorflow.python.ops import random_ops
+from tensorflow.python.ops import summary_ops
+from tensorflow.python.ops import variables
+from tensorflow.python.training import queue_runner
+
+
+def match_filenames_once(pattern, name=None):
+ """Save the list of files matching pattern, so it is only computed once.
+
+ Args:
+ pattern: A file pattern (glob).
+ name: A name for the operations (optional).
+
+ Returns:
+ A variable that is initialized to the list of files matching pattern.
+ """
+ with ops.op_scope([pattern], name, "matching_filenames") as name:
+ return variables.Variable(io_ops.matching_files(pattern), trainable=False,
+ name=name, validate_shape=False)
+
+
+def limit_epochs(tensor, num_epochs=None, name=None):
+ """Returns tensor num_epochs times and then raises an OutOfRange error.
+
+ Args:
+ tensor: Any Tensor.
+ num_epochs: An integer (optional). If specified, limits the number
+ of steps the output tensor may be evaluated.
+ name: A name for the operations (optional).
+
+ Returns:
+ tensor or OutOfRange.
+ """
+ if num_epochs is None:
+ return tensor
+ if num_epochs <= 0:
+ raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
+ with ops.op_scope([tensor], name, "limit_epochs") as name:
+ zero64 = constant_op.constant(0, dtype=types.int64)
+ epochs = variables.Variable(zero64, name="epochs")
+ counter = epochs.count_up_to(num_epochs)
+ with ops.control_dependencies([counter]):
+ return array_ops.identity(tensor, name=name)
+
+
+def _input_producer(input_tensor, dtype, num_epochs, shuffle, seed, capacity,
+ name, summary_name):
+ if shuffle:
+ input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
+ input_tensor = limit_epochs(input_tensor, num_epochs)
+
+ q = data_flow_ops.FIFOQueue(capacity=capacity, dtypes=[dtype], shapes=[[]],
+ name=name)
+ enq = q.enqueue_many([input_tensor])
+ queue_runner.add_queue_runner(queue_runner.QueueRunner(q, [enq]))
+ summary_ops.scalar_summary("queue/%s/%s" % (q.name, summary_name),
+ math_ops.cast(q.size(), types.float32) *
+ (1. / capacity))
+ return q
+
+
+def string_input_producer(string_tensor, num_epochs=None, shuffle=True,
+ seed=None, capacity=32, name=None):
+ """Output strings (e.g. filenames) to a queue for an input pipeline.
+
+ Args:
+ string_tensor: A 1-D string tensor with the strings to produce.
+ num_epochs: An integer (optional). If specified, `string_input_producer`
+ produces each string from `string_tensor` `num_epochs` times before
+ generating an OutOfRange error. If not specified, `string_input_producer`
+ can cycle through the strings in `string_tensor` an unlimited number of
+ times.
+ shuffle: Boolean. If true, the strings are randomly shuffled within each
+ epoch.
+ seed: An integer (optional). Seed used if shuffle == True.
+ capacity: An integer. Sets the queue capacity.
+ name: A name for the operations (optional).
+
+ Returns:
+ A queue with the output strings. A QueueRunner for the Queue
+ is added to the current Graph's QUEUE_RUNNER collection.
+ """
+ with ops.op_scope([string_tensor], name, "input_producer") as name:
+ return _input_producer(
+ string_tensor, types.string, num_epochs, shuffle, seed, capacity, name,
+ "fraction_of_%d_full" % capacity)
+
+
+def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None,
+ capacity=32, name=None):
+ """Produces the integers from 0 to limit-1 in a queue.
+
+ Args:
+ limit: An int32 scalar tensor.
+ num_epochs: An integer (optional). If specified, `range_input_producer`
+ produces each integer `num_epochs` times before generating an
+ OutOfRange error. If not specified, `range_input_producer` can cycle
+ through the integers an unlimited number of times.
+ shuffle: Boolean. If true, the integers are randomly shuffled within each
+ epoch.
+ seed: An integer (optional). Seed used if shuffle == True.
+ capacity: An integer. Sets the queue capacity.
+ name: A name for the operations (optional).
+
+ Returns:
+ A Queue with the output integers. A QueueRunner for the Queue
+ is added to the current Graph's QUEUE_RUNNER collection.
+ """
+ with ops.op_scope([limit], name, "input_producer") as name:
+ range_tensor = math_ops.range(0, limit)
+ return _input_producer(
+ range_tensor, types.int32, num_epochs, shuffle, seed, capacity, name,
+ "fraction_of_%d_full" % capacity)
+
+
+def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
+ capacity=32, name=None):
+ """Produces a slice of each Tensor in tensor_list.
+
+ Implemented using a Queue -- a QueueRunner for the Queue
+ is added to the current Graph's QUEUE_RUNNER collection.
+
+ Args:
+ tensor_list: A list of Tensors. Every Tensor in tensor_list must
+ have the same size in the first dimension.
+ num_epochs: An integer (optional). If specified, `slice_input_producer`
+ produces each slice `num_epochs` times before generating
+ an OutOfRange error. If not specified, `slice_input_producer` can cycle
+ through the slices an unlimited number of times.
+ seed: An integer (optional). Seed used if shuffle == True.
+ capacity: An integer. Sets the queue capacity.
+ name: A name for the operations (optional).
+
+ Returns:
+ A list of tensors, one for each element of tensor_list. If the tensor
+ in tensor_list has shape [N, a, b, .., z], then the corresponding output
+ tensor will have shape [a, b, ..., z].
+ """
+ with ops.op_scope(tensor_list, name, "input_producer"):
+ tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
+ if not tensor_list:
+ raise ValueError(
+ "Expected at least one tensor in slice_input_producer().")
+ range_size = array_ops.shape(tensor_list[0])[0]
+ # TODO(josh11b): Add an assertion that the first dimension of
+ # everything in TensorList matches. Maybe just check the inferred shapes?
+ queue = range_input_producer(range_size, num_epochs=num_epochs,
+ shuffle=shuffle, seed=seed, capacity=capacity)
+ index = queue.dequeue()
+ output = [array_ops.gather(t, index) for t in tensor_list]
+ return output
+
+
+# Helpers for the batching functions ------------------------------------------
+
+def _flatten(tensor_list_list):
+ return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
+
+
+def _validate(tensor_list):
+ tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
+ if not tensor_list:
+ raise ValueError("Expected at least one tensor in batch().")
+ return tensor_list
+
+
+def _validate_join(tensor_list_list):
+ tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl)
+ for tl in tensor_list_list]
+ if not tensor_list_list:
+ raise ValueError("Expected at least one input in batch_join().")
+ return tensor_list_list
+
+
+def _dtypes(tensor_list_list):
+ all_dtypes = [[t.dtype for t in tl] for tl in tensor_list_list]
+ dtypes = all_dtypes[0]
+ for other_dtypes in all_dtypes[1:]:
+ if other_dtypes != dtypes:
+ raise TypeError("Expected types to be consistent: %s vs. %s." %
+ ", ".join(x.name for x in dtypes),
+ ", ".join(x.name for x in other_dtypes))
+ return dtypes
+
+
+def _merge_shapes(shape_list, enqueue_many):
+ shape_list = [tensor_shape.as_shape(s) for s in shape_list]
+ if enqueue_many:
+ # We want the shapes without the leading batch dimension.
+ shape_list = [s.WithRankAtLeast(1)[1:] for s in shape_list]
+ merged_shape = shape_list[0]
+ for s in shape_list[1:]:
+ merged_shape.merge_with(s)
+ return merged_shape.as_list()
+
+
+def _shapes(tensor_list_list, shapes, enqueue_many):
+ if shapes is None:
+ l = len(tensor_list_list[0])
+ shapes = [_merge_shapes([tl[i].get_shape().as_list()
+ for tl in tensor_list_list],
+ enqueue_many) for i in range(l)]
+ return shapes
+
+
+def _enqueue_join(queue, tensor_list_list, enqueue_many):
+ if enqueue_many:
+ enqueue_ops = [queue.enqueue_many(tl) for tl in tensor_list_list]
+ else:
+ enqueue_ops = [queue.enqueue(tl) for tl in tensor_list_list]
+ queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
+
+
+def _enqueue(queue, tensor_list, threads, enqueue_many):
+ if enqueue_many:
+ enqueue_ops = [queue.enqueue_many(tensor_list)] * threads
+ else:
+ enqueue_ops = [queue.enqueue(tensor_list)] * threads
+ queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
+
+
+# Batching functions ----------------------------------------------------------
+
+def batch(tensor_list, batch_size, num_threads=1, capacity=32,
+ enqueue_many=False, shapes=None, name=None):
+ """Run tensor_list to fill a queue to create batches.
+
+ Implemented using a queue -- a QueueRunner for the queue
+ is added to the current Graph's QUEUE_RUNNER collection.
+
+ Args:
+ tensor_list: The list of tensors to enqueue.
+ batch_size: The new batch size pulled from the queue.
+ num_threads: The number of threads enqueuing tensor_list.
+ capacity: Maximum number of elements in the queue, controls the
+ how far ahead the prefetching allowed is allowed to get and
+ memory usage.
+ enqueue_many: If False, tensor_list is assumed to represent a
+ single example. If True, tensor_list is assumed to represent
+ a batch of examples, where the first dimension is indexed by
+ example, and all members of tensor_list should have the same
+ size in the first dimension.
+ shapes: Optional. The shapes for each example. Defaults to the
+ inferred shapes for tensor_list (leaving off the first dimension
+ if enqueue_many is True).
+ name: A name for the operations (optional).
+
+ Returns:
+ A list of tensors with the same number and types as tensor_list.
+ If enqueue_many is false, then an input tensor with shape
+ `[x, y, z]` will be output as a tensor with shape
+ `[batch_size, x, y, z]`. If enqueue_many is True, and an
+ input tensor has shape `[*, x, y, z]`, the the output will have
+ shape `[batch_size, x, y, z]`.
+ """
+ with ops.op_scope(tensor_list, name, "batch") as name:
+ tensor_list = _validate(tensor_list)
+ dtypes = _dtypes([tensor_list])
+ shapes = _shapes([tensor_list], shapes, enqueue_many)
+ # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
+ queue = data_flow_ops.FIFOQueue(
+ capacity=capacity, dtypes=dtypes, shapes=shapes)
+ _enqueue(queue, tensor_list, num_threads, enqueue_many)
+ summary_ops.scalar_summary(
+ "queue/%s/fraction_of_%d_full" % (queue.name, capacity),
+ math_ops.cast(queue.size(), types.float32) * (1. / capacity))
+ return queue.dequeue_many(batch_size, name=name)
+
+
+# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be
+# a multiple of len(tensor_list_list)?) parameter, to address the use
+# case where you want more parallelism than you can support different
+# readers (either because you don't have that many files or can't
+# read that many files in parallel due to the number of seeks required).
+# Once this is done, batch() can be written as a call to batch_join().
+def batch_join(tensor_list_list, batch_size, capacity=32, enqueue_many=False,
+ shapes=None, name=None):
+ """Run a list of tensors to fill a queue to create batches of examples.
+
+ This version enqueues a different list of tensors in different threads.
+ Implemented using a queue -- a QueueRunner for the queue
+ is added to the current Graph's QUEUE_RUNNER collection.
+
+ Args:
+ tensor_list_list: A list of tuples of tensors to enqueue.
+ len(tensor_list_list) threads will be started, with the i-th
+ thread enqueuing the tensors from tensor_list[i].
+ tensor_list[i1][j] must match tensor_list[i2][j] in type and
+ shape (except in the first dimension if enqueue_many is true).
+ batch_size: The new batch size pulled from the queue.
+ capacity: Maximum number of elements in the queue, controls the
+ how far ahead the prefetching allowed is allowed to get and
+ memory usage.
+ enqueue_many: If False, each tensor_list_list[i] is assumed to
+ represent a single example. If True, tensor_list_list[i] is
+ assumed to represent a batch of examples, where the first
+ dimension is indexed by example, and all members of
+ tensor_list_list[i] should have the same size in the first
+ dimension.
+ shapes: Optional. The shapes for each example. Defaults to the
+ inferred shapes for tensor_list_list[i] (which must match, after
+ leaving off the first dimension if enqueue_many is True).
+ name: A name for the operations (optional).
+
+ Returns:
+ A list of tensors with the same number and types as
+ tensor_list_list[i]. If enqueue_many is false, then an input
+ tensor with shape `[x, y, z]` will be output as a tensor with
+ shape `[batch_size, x, y, z]`. If enqueue_many is True, and an
+ input tensor has shape `[*, x, y, z]`, the the output will have
+ shape `[batch_size, x, y, z]`.
+ """
+ with ops.op_scope(_flatten(tensor_list_list), name, "batch_join") as name:
+ tensor_list_list = _validate_join(tensor_list_list)
+ dtypes = _dtypes(tensor_list_list)
+ shapes = _shapes(tensor_list_list, shapes, enqueue_many)
+ # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
+ queue = data_flow_ops.FIFOQueue(
+ capacity=capacity, dtypes=dtypes, shapes=shapes)
+ _enqueue_join(queue, tensor_list_list, enqueue_many)
+ summary_ops.scalar_summary(
+ "queue/%s/fraction_of_%d_full" % (queue.name, capacity),
+ math_ops.cast(queue.size(), types.float32) * (1. / capacity))
+ return queue.dequeue_many(batch_size, name=name)
+
+
+def shuffle_batch(tensor_list, batch_size, capacity, min_after_dequeue,
+ num_threads=1, seed=None, enqueue_many=False, shapes=None,
+ name=None):
+ """Create batches by randomly shuffling tensors.
+
+ This adds:
+
+ * a shuffling queue into which tensors from tensor_list are enqueued.
+ * a dequeue many operation to create batches from the queue,
+ * and a QueueRunner is added to the current Graph's QUEUE_RUNNER collection,
+ to enqueue the tensors from tensor_list.
+
+ Args:
+ tensor_list: The list of tensors to enqueue.
+ batch_size: The new batch size pulled from the queue.
+ capacity: Maximum number of elements in the queue, controls the
+ how far ahead the prefetching allowed is allowed to get and
+ memory usage.
+ min_after_dequeue: Minimum number elements in the queue after a
+ dequeue, used to ensure a level of mixing of elements.
+ num_threads: The number of threads enqueuing tensor_list.
+ seed: Seed for the random shuffling within the queue.
+ enqueue_many: If False, tensor_list is assumed to represent a
+ single example. If True, tensor_list is assumed to represent
+ a batch of examples, where the first dimension is indexed by
+ example, and all members of tensor_list should have the same
+ size in the first dimension.
+ shapes: Optional. The shapes for each example. Defaults to the
+ inferred shapes for tensor_list (leaving off the first dimension
+ if enqueue_many is True).
+ name: A name for the operations (optional).
+
+ Returns:
+ A list of tensors with the same number and types as tensor_list.
+ If enqueue_many is false, then an input tensor with shape
+ `[x, y, z]` will be output as a tensor with shape
+ `[batch_size, x, y, z]`. If enqueue_many is True, and an
+ input tensor has shape `[*, x, y, z]`, the the output will have
+ shape `[batch_size, x, y, z]`.
+ """
+ with ops.op_scope(tensor_list, name, "shuffle_batch") as name:
+ tensor_list = _validate(tensor_list)
+ dtypes = _dtypes([tensor_list])
+ shapes = _shapes([tensor_list], shapes, enqueue_many)
+ queue = data_flow_ops.RandomShuffleQueue(
+ capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
+ dtypes=dtypes, shapes=shapes)
+ _enqueue(queue, tensor_list, num_threads, enqueue_many)
+ full = (math_ops.cast(queue.size() - min_after_dequeue, types.float32) *
+ (1. / (capacity - min_after_dequeue)))
+ # Note that name contains a '/' at the end so we intentionally do not place
+ # a '/' after %s below.
+ summary_name = (
+ "queue/%sfraction_over_%d_of_%d_full" %
+ (name, min_after_dequeue, capacity - min_after_dequeue))
+ summary_ops.scalar_summary(summary_name, full)
+
+ return queue.dequeue_many(batch_size, name=name)
+
+
+def shuffle_batch_join(tensor_list_list, batch_size, capacity,
+ min_after_dequeue, seed=None, enqueue_many=False,
+ shapes=None, name=None):
+ """Create batches by randomly shuffling tensors.
+
+ This version enqueues a different list of tensors in different threads.
+ It adds:
+
+ * a shuffling queue into which tensors from tensor_list_list are enqueued.
+ * a dequeue many operation to create batches from the queue,
+ * and a QueueRunner is added to the current Graph's QUEUE_RUNNER collection,
+ to enqueue the tensors from tensor_list_list.
+
+ Args:
+ tensor_list_list: A list of tuples of tensors to enqueue.
+ len(tensor_list_list) threads will be started, with the i-th
+ thread enqueuing the tensors from tensor_list[i].
+ tensor_list[i1][j] must match tensor_list[i2][j] in type and
+ shape (except in the first dimension if enqueue_many is true).
+ batch_size: The new batch size pulled from the queue.
+ capacity: Maximum number of elements in the queue, controls the
+ how far ahead the prefetching allowed is allowed to get and
+ memory usage.
+ min_after_dequeue: Minimum number elements in the queue after a
+ dequeue, used to ensure a level of mixing of elements.
+ seed: Seed for the random shuffling within the queue.
+ enqueue_many: If False, each tensor_list_list[i] is assumed to
+ represent a single example. If True, tensor_list_list[i] is
+ assumed to represent a batch of examples, where the first
+ dimension is indexed by example, and all members of
+ tensor_list_list[i] should have the same size in the first
+ dimension.
+ shapes: Optional. The shapes for each example. Defaults to the
+ inferred shapes for tensor_list_list[i] (which must match, after
+ leaving off the first dimension if enqueue_many is True).
+ name: A name for the operations (optional).
+
+ Returns:
+ A list of tensors with the same number and types as
+ tensor_list_list[i]. If enqueue_many is false, then an input
+ tensor with shape `[x, y, z]` will be output as a tensor with
+ shape `[batch_size, x, y, z]`. If enqueue_many is True, and an
+ input tensor has shape `[*, x, y, z]`, the the output will have
+ shape `[batch_size, x, y, z]`.
+ """
+ with ops.op_scope(
+ _flatten(tensor_list_list), name, "shuffle_batch_join") as name:
+ tensor_list_list = _validate_join(tensor_list_list)
+ dtypes = _dtypes(tensor_list_list)
+ shapes = _shapes(tensor_list_list, shapes, enqueue_many)
+ queue = data_flow_ops.RandomShuffleQueue(
+ capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
+ dtypes=dtypes, shapes=shapes)
+ _enqueue_join(queue, tensor_list_list, enqueue_many)
+ full = (math_ops.cast(queue.size() - min_after_dequeue, types.float32) *
+ (1. / (capacity - min_after_dequeue)))
+ # Note that name contains a '/' at the end so we intentionally do not place
+ # a '/' after %s below.
+ summary_name = (
+ "queue/%sfraction_over_%d_of_%d_full" %
+ (name, min_after_dequeue, capacity - min_after_dequeue))
+ summary_ops.scalar_summary(summary_name, full)
+ return queue.dequeue_many(batch_size, name=name)