diff options
Diffstat (limited to 'tensorflow/python/training/input.py')
-rw-r--r-- | tensorflow/python/training/input.py | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/tensorflow/python/training/input.py b/tensorflow/python/training/input.py new file mode 100644 index 0000000000..413fc044f7 --- /dev/null +++ b/tensorflow/python/training/input.py @@ -0,0 +1,501 @@ +"""## Input pipeline + +TensorFlow functions for setting up an input-prefetching pipeline. +Please see the [reading data how-to](../../how_tos/reading_data.md) +for context. + +### Beginning of an input pipeline + +The "producer" functions add a queue to the graph and a corresponding +QueueRunner for running the subgraph that fills that queue. + +@@match_filenames_once +@@limit_epochs +@@range_input_producer +@@slice_input_producer +@@string_input_producer + +### Batching at the end of an input pipeline + +These functions add a queue to the graph to assemble a batch of +examples, with possible shuffling. They also add a QueueRunner for +running the subgraph that fills that queue. + +Use [batch](#batch) or [batch_join](#batch_join) for batching examples that have +already been well shuffled. Use [shuffle_batch](#shuffle_batch) or +[shuffle_batch_join](#shuffle_batch_join) for examples that +would benefit from additional shuffling. + +Use [batch](#batch) or [shuffle_batch](#shuffle_batch) if you want a +single thread producing examples to batch, or if you have a +single subgraph producing examples but you want to run it in N threads +(where you increase N until it can keep the queue full). Use +[batch_join](#batch_join) or [shuffle_batch_join](#shuffle_batch_join) +if you have N different subgraphs producing examples to batch and you +want them run by N threads. + +@@batch +@@batch_join +@@shuffle_batch +@@shuffle_batch_join + +""" + +from tensorflow.python.framework import ops +from tensorflow.python.framework import tensor_shape +from tensorflow.python.framework import types +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import constant_op +from tensorflow.python.ops import data_flow_ops +from tensorflow.python.ops import io_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import random_ops +from tensorflow.python.ops import summary_ops +from tensorflow.python.ops import variables +from tensorflow.python.training import queue_runner + + +def match_filenames_once(pattern, name=None): + """Save the list of files matching pattern, so it is only computed once. + + Args: + pattern: A file pattern (glob). + name: A name for the operations (optional). + + Returns: + A variable that is initialized to the list of files matching pattern. + """ + with ops.op_scope([pattern], name, "matching_filenames") as name: + return variables.Variable(io_ops.matching_files(pattern), trainable=False, + name=name, validate_shape=False) + + +def limit_epochs(tensor, num_epochs=None, name=None): + """Returns tensor num_epochs times and then raises an OutOfRange error. + + Args: + tensor: Any Tensor. + num_epochs: An integer (optional). If specified, limits the number + of steps the output tensor may be evaluated. + name: A name for the operations (optional). + + Returns: + tensor or OutOfRange. + """ + if num_epochs is None: + return tensor + if num_epochs <= 0: + raise ValueError("num_epochs must be > 0 not %d." % num_epochs) + with ops.op_scope([tensor], name, "limit_epochs") as name: + zero64 = constant_op.constant(0, dtype=types.int64) + epochs = variables.Variable(zero64, name="epochs") + counter = epochs.count_up_to(num_epochs) + with ops.control_dependencies([counter]): + return array_ops.identity(tensor, name=name) + + +def _input_producer(input_tensor, dtype, num_epochs, shuffle, seed, capacity, + name, summary_name): + if shuffle: + input_tensor = random_ops.random_shuffle(input_tensor, seed=seed) + input_tensor = limit_epochs(input_tensor, num_epochs) + + q = data_flow_ops.FIFOQueue(capacity=capacity, dtypes=[dtype], shapes=[[]], + name=name) + enq = q.enqueue_many([input_tensor]) + queue_runner.add_queue_runner(queue_runner.QueueRunner(q, [enq])) + summary_ops.scalar_summary("queue/%s/%s" % (q.name, summary_name), + math_ops.cast(q.size(), types.float32) * + (1. / capacity)) + return q + + +def string_input_producer(string_tensor, num_epochs=None, shuffle=True, + seed=None, capacity=32, name=None): + """Output strings (e.g. filenames) to a queue for an input pipeline. + + Args: + string_tensor: A 1-D string tensor with the strings to produce. + num_epochs: An integer (optional). If specified, `string_input_producer` + produces each string from `string_tensor` `num_epochs` times before + generating an OutOfRange error. If not specified, `string_input_producer` + can cycle through the strings in `string_tensor` an unlimited number of + times. + shuffle: Boolean. If true, the strings are randomly shuffled within each + epoch. + seed: An integer (optional). Seed used if shuffle == True. + capacity: An integer. Sets the queue capacity. + name: A name for the operations (optional). + + Returns: + A queue with the output strings. A QueueRunner for the Queue + is added to the current Graph's QUEUE_RUNNER collection. + """ + with ops.op_scope([string_tensor], name, "input_producer") as name: + return _input_producer( + string_tensor, types.string, num_epochs, shuffle, seed, capacity, name, + "fraction_of_%d_full" % capacity) + + +def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None, + capacity=32, name=None): + """Produces the integers from 0 to limit-1 in a queue. + + Args: + limit: An int32 scalar tensor. + num_epochs: An integer (optional). If specified, `range_input_producer` + produces each integer `num_epochs` times before generating an + OutOfRange error. If not specified, `range_input_producer` can cycle + through the integers an unlimited number of times. + shuffle: Boolean. If true, the integers are randomly shuffled within each + epoch. + seed: An integer (optional). Seed used if shuffle == True. + capacity: An integer. Sets the queue capacity. + name: A name for the operations (optional). + + Returns: + A Queue with the output integers. A QueueRunner for the Queue + is added to the current Graph's QUEUE_RUNNER collection. + """ + with ops.op_scope([limit], name, "input_producer") as name: + range_tensor = math_ops.range(0, limit) + return _input_producer( + range_tensor, types.int32, num_epochs, shuffle, seed, capacity, name, + "fraction_of_%d_full" % capacity) + + +def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, + capacity=32, name=None): + """Produces a slice of each Tensor in tensor_list. + + Implemented using a Queue -- a QueueRunner for the Queue + is added to the current Graph's QUEUE_RUNNER collection. + + Args: + tensor_list: A list of Tensors. Every Tensor in tensor_list must + have the same size in the first dimension. + num_epochs: An integer (optional). If specified, `slice_input_producer` + produces each slice `num_epochs` times before generating + an OutOfRange error. If not specified, `slice_input_producer` can cycle + through the slices an unlimited number of times. + seed: An integer (optional). Seed used if shuffle == True. + capacity: An integer. Sets the queue capacity. + name: A name for the operations (optional). + + Returns: + A list of tensors, one for each element of tensor_list. If the tensor + in tensor_list has shape [N, a, b, .., z], then the corresponding output + tensor will have shape [a, b, ..., z]. + """ + with ops.op_scope(tensor_list, name, "input_producer"): + tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) + if not tensor_list: + raise ValueError( + "Expected at least one tensor in slice_input_producer().") + range_size = array_ops.shape(tensor_list[0])[0] + # TODO(josh11b): Add an assertion that the first dimension of + # everything in TensorList matches. Maybe just check the inferred shapes? + queue = range_input_producer(range_size, num_epochs=num_epochs, + shuffle=shuffle, seed=seed, capacity=capacity) + index = queue.dequeue() + output = [array_ops.gather(t, index) for t in tensor_list] + return output + + +# Helpers for the batching functions ------------------------------------------ + +def _flatten(tensor_list_list): + return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] + + +def _validate(tensor_list): + tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) + if not tensor_list: + raise ValueError("Expected at least one tensor in batch().") + return tensor_list + + +def _validate_join(tensor_list_list): + tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl) + for tl in tensor_list_list] + if not tensor_list_list: + raise ValueError("Expected at least one input in batch_join().") + return tensor_list_list + + +def _dtypes(tensor_list_list): + all_dtypes = [[t.dtype for t in tl] for tl in tensor_list_list] + dtypes = all_dtypes[0] + for other_dtypes in all_dtypes[1:]: + if other_dtypes != dtypes: + raise TypeError("Expected types to be consistent: %s vs. %s." % + ", ".join(x.name for x in dtypes), + ", ".join(x.name for x in other_dtypes)) + return dtypes + + +def _merge_shapes(shape_list, enqueue_many): + shape_list = [tensor_shape.as_shape(s) for s in shape_list] + if enqueue_many: + # We want the shapes without the leading batch dimension. + shape_list = [s.WithRankAtLeast(1)[1:] for s in shape_list] + merged_shape = shape_list[0] + for s in shape_list[1:]: + merged_shape.merge_with(s) + return merged_shape.as_list() + + +def _shapes(tensor_list_list, shapes, enqueue_many): + if shapes is None: + l = len(tensor_list_list[0]) + shapes = [_merge_shapes([tl[i].get_shape().as_list() + for tl in tensor_list_list], + enqueue_many) for i in range(l)] + return shapes + + +def _enqueue_join(queue, tensor_list_list, enqueue_many): + if enqueue_many: + enqueue_ops = [queue.enqueue_many(tl) for tl in tensor_list_list] + else: + enqueue_ops = [queue.enqueue(tl) for tl in tensor_list_list] + queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) + + +def _enqueue(queue, tensor_list, threads, enqueue_many): + if enqueue_many: + enqueue_ops = [queue.enqueue_many(tensor_list)] * threads + else: + enqueue_ops = [queue.enqueue(tensor_list)] * threads + queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) + + +# Batching functions ---------------------------------------------------------- + +def batch(tensor_list, batch_size, num_threads=1, capacity=32, + enqueue_many=False, shapes=None, name=None): + """Run tensor_list to fill a queue to create batches. + + Implemented using a queue -- a QueueRunner for the queue + is added to the current Graph's QUEUE_RUNNER collection. + + Args: + tensor_list: The list of tensors to enqueue. + batch_size: The new batch size pulled from the queue. + num_threads: The number of threads enqueuing tensor_list. + capacity: Maximum number of elements in the queue, controls the + how far ahead the prefetching allowed is allowed to get and + memory usage. + enqueue_many: If False, tensor_list is assumed to represent a + single example. If True, tensor_list is assumed to represent + a batch of examples, where the first dimension is indexed by + example, and all members of tensor_list should have the same + size in the first dimension. + shapes: Optional. The shapes for each example. Defaults to the + inferred shapes for tensor_list (leaving off the first dimension + if enqueue_many is True). + name: A name for the operations (optional). + + Returns: + A list of tensors with the same number and types as tensor_list. + If enqueue_many is false, then an input tensor with shape + `[x, y, z]` will be output as a tensor with shape + `[batch_size, x, y, z]`. If enqueue_many is True, and an + input tensor has shape `[*, x, y, z]`, the the output will have + shape `[batch_size, x, y, z]`. + """ + with ops.op_scope(tensor_list, name, "batch") as name: + tensor_list = _validate(tensor_list) + dtypes = _dtypes([tensor_list]) + shapes = _shapes([tensor_list], shapes, enqueue_many) + # TODO(josh11b,mrry): Switch to BatchQueue once it is written. + queue = data_flow_ops.FIFOQueue( + capacity=capacity, dtypes=dtypes, shapes=shapes) + _enqueue(queue, tensor_list, num_threads, enqueue_many) + summary_ops.scalar_summary( + "queue/%s/fraction_of_%d_full" % (queue.name, capacity), + math_ops.cast(queue.size(), types.float32) * (1. / capacity)) + return queue.dequeue_many(batch_size, name=name) + + +# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be +# a multiple of len(tensor_list_list)?) parameter, to address the use +# case where you want more parallelism than you can support different +# readers (either because you don't have that many files or can't +# read that many files in parallel due to the number of seeks required). +# Once this is done, batch() can be written as a call to batch_join(). +def batch_join(tensor_list_list, batch_size, capacity=32, enqueue_many=False, + shapes=None, name=None): + """Run a list of tensors to fill a queue to create batches of examples. + + This version enqueues a different list of tensors in different threads. + Implemented using a queue -- a QueueRunner for the queue + is added to the current Graph's QUEUE_RUNNER collection. + + Args: + tensor_list_list: A list of tuples of tensors to enqueue. + len(tensor_list_list) threads will be started, with the i-th + thread enqueuing the tensors from tensor_list[i]. + tensor_list[i1][j] must match tensor_list[i2][j] in type and + shape (except in the first dimension if enqueue_many is true). + batch_size: The new batch size pulled from the queue. + capacity: Maximum number of elements in the queue, controls the + how far ahead the prefetching allowed is allowed to get and + memory usage. + enqueue_many: If False, each tensor_list_list[i] is assumed to + represent a single example. If True, tensor_list_list[i] is + assumed to represent a batch of examples, where the first + dimension is indexed by example, and all members of + tensor_list_list[i] should have the same size in the first + dimension. + shapes: Optional. The shapes for each example. Defaults to the + inferred shapes for tensor_list_list[i] (which must match, after + leaving off the first dimension if enqueue_many is True). + name: A name for the operations (optional). + + Returns: + A list of tensors with the same number and types as + tensor_list_list[i]. If enqueue_many is false, then an input + tensor with shape `[x, y, z]` will be output as a tensor with + shape `[batch_size, x, y, z]`. If enqueue_many is True, and an + input tensor has shape `[*, x, y, z]`, the the output will have + shape `[batch_size, x, y, z]`. + """ + with ops.op_scope(_flatten(tensor_list_list), name, "batch_join") as name: + tensor_list_list = _validate_join(tensor_list_list) + dtypes = _dtypes(tensor_list_list) + shapes = _shapes(tensor_list_list, shapes, enqueue_many) + # TODO(josh11b,mrry): Switch to BatchQueue once it is written. + queue = data_flow_ops.FIFOQueue( + capacity=capacity, dtypes=dtypes, shapes=shapes) + _enqueue_join(queue, tensor_list_list, enqueue_many) + summary_ops.scalar_summary( + "queue/%s/fraction_of_%d_full" % (queue.name, capacity), + math_ops.cast(queue.size(), types.float32) * (1. / capacity)) + return queue.dequeue_many(batch_size, name=name) + + +def shuffle_batch(tensor_list, batch_size, capacity, min_after_dequeue, + num_threads=1, seed=None, enqueue_many=False, shapes=None, + name=None): + """Create batches by randomly shuffling tensors. + + This adds: + + * a shuffling queue into which tensors from tensor_list are enqueued. + * a dequeue many operation to create batches from the queue, + * and a QueueRunner is added to the current Graph's QUEUE_RUNNER collection, + to enqueue the tensors from tensor_list. + + Args: + tensor_list: The list of tensors to enqueue. + batch_size: The new batch size pulled from the queue. + capacity: Maximum number of elements in the queue, controls the + how far ahead the prefetching allowed is allowed to get and + memory usage. + min_after_dequeue: Minimum number elements in the queue after a + dequeue, used to ensure a level of mixing of elements. + num_threads: The number of threads enqueuing tensor_list. + seed: Seed for the random shuffling within the queue. + enqueue_many: If False, tensor_list is assumed to represent a + single example. If True, tensor_list is assumed to represent + a batch of examples, where the first dimension is indexed by + example, and all members of tensor_list should have the same + size in the first dimension. + shapes: Optional. The shapes for each example. Defaults to the + inferred shapes for tensor_list (leaving off the first dimension + if enqueue_many is True). + name: A name for the operations (optional). + + Returns: + A list of tensors with the same number and types as tensor_list. + If enqueue_many is false, then an input tensor with shape + `[x, y, z]` will be output as a tensor with shape + `[batch_size, x, y, z]`. If enqueue_many is True, and an + input tensor has shape `[*, x, y, z]`, the the output will have + shape `[batch_size, x, y, z]`. + """ + with ops.op_scope(tensor_list, name, "shuffle_batch") as name: + tensor_list = _validate(tensor_list) + dtypes = _dtypes([tensor_list]) + shapes = _shapes([tensor_list], shapes, enqueue_many) + queue = data_flow_ops.RandomShuffleQueue( + capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, + dtypes=dtypes, shapes=shapes) + _enqueue(queue, tensor_list, num_threads, enqueue_many) + full = (math_ops.cast(queue.size() - min_after_dequeue, types.float32) * + (1. / (capacity - min_after_dequeue))) + # Note that name contains a '/' at the end so we intentionally do not place + # a '/' after %s below. + summary_name = ( + "queue/%sfraction_over_%d_of_%d_full" % + (name, min_after_dequeue, capacity - min_after_dequeue)) + summary_ops.scalar_summary(summary_name, full) + + return queue.dequeue_many(batch_size, name=name) + + +def shuffle_batch_join(tensor_list_list, batch_size, capacity, + min_after_dequeue, seed=None, enqueue_many=False, + shapes=None, name=None): + """Create batches by randomly shuffling tensors. + + This version enqueues a different list of tensors in different threads. + It adds: + + * a shuffling queue into which tensors from tensor_list_list are enqueued. + * a dequeue many operation to create batches from the queue, + * and a QueueRunner is added to the current Graph's QUEUE_RUNNER collection, + to enqueue the tensors from tensor_list_list. + + Args: + tensor_list_list: A list of tuples of tensors to enqueue. + len(tensor_list_list) threads will be started, with the i-th + thread enqueuing the tensors from tensor_list[i]. + tensor_list[i1][j] must match tensor_list[i2][j] in type and + shape (except in the first dimension if enqueue_many is true). + batch_size: The new batch size pulled from the queue. + capacity: Maximum number of elements in the queue, controls the + how far ahead the prefetching allowed is allowed to get and + memory usage. + min_after_dequeue: Minimum number elements in the queue after a + dequeue, used to ensure a level of mixing of elements. + seed: Seed for the random shuffling within the queue. + enqueue_many: If False, each tensor_list_list[i] is assumed to + represent a single example. If True, tensor_list_list[i] is + assumed to represent a batch of examples, where the first + dimension is indexed by example, and all members of + tensor_list_list[i] should have the same size in the first + dimension. + shapes: Optional. The shapes for each example. Defaults to the + inferred shapes for tensor_list_list[i] (which must match, after + leaving off the first dimension if enqueue_many is True). + name: A name for the operations (optional). + + Returns: + A list of tensors with the same number and types as + tensor_list_list[i]. If enqueue_many is false, then an input + tensor with shape `[x, y, z]` will be output as a tensor with + shape `[batch_size, x, y, z]`. If enqueue_many is True, and an + input tensor has shape `[*, x, y, z]`, the the output will have + shape `[batch_size, x, y, z]`. + """ + with ops.op_scope( + _flatten(tensor_list_list), name, "shuffle_batch_join") as name: + tensor_list_list = _validate_join(tensor_list_list) + dtypes = _dtypes(tensor_list_list) + shapes = _shapes(tensor_list_list, shapes, enqueue_many) + queue = data_flow_ops.RandomShuffleQueue( + capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, + dtypes=dtypes, shapes=shapes) + _enqueue_join(queue, tensor_list_list, enqueue_many) + full = (math_ops.cast(queue.size() - min_after_dequeue, types.float32) * + (1. / (capacity - min_after_dequeue))) + # Note that name contains a '/' at the end so we intentionally do not place + # a '/' after %s below. + summary_name = ( + "queue/%sfraction_over_%d_of_%d_full" % + (name, min_after_dequeue, capacity - min_after_dequeue)) + summary_ops.scalar_summary(summary_name, full) + return queue.dequeue_many(batch_size, name=name) |