diff options
Diffstat (limited to 'tensorflow/contrib/data/python/ops/batching.py')
-rw-r--r-- | tensorflow/contrib/data/python/ops/batching.py | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/tensorflow/contrib/data/python/ops/batching.py b/tensorflow/contrib/data/python/ops/batching.py index 7350d595f5..a4914f4cde 100644 --- a/tensorflow/contrib/data/python/ops/batching.py +++ b/tensorflow/contrib/data/python/ops/batching.py @@ -17,22 +17,135 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numpy as np + +from tensorflow.contrib.data.python.ops import get_single_element +from tensorflow.contrib.data.python.ops import grouping from tensorflow.contrib.framework import with_shape from tensorflow.python.data.ops import dataset_ops from tensorflow.python.data.util import convert from tensorflow.python.data.util import nest from tensorflow.python.data.util import sparse +from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes from tensorflow.python.framework import ops from tensorflow.python.framework import sparse_tensor from tensorflow.python.framework import tensor_shape from tensorflow.python.framework import tensor_util from tensorflow.python.ops import array_ops +from tensorflow.python.ops import check_ops +from tensorflow.python.ops import control_flow_ops +from tensorflow.python.ops import gen_array_ops from tensorflow.python.ops import gen_dataset_ops from tensorflow.python.ops import math_ops +from tensorflow.python.ops import sparse_ops from tensorflow.python.util import deprecation +def batch_window(dataset): + """Batches a window of tensors. + + Args: + dataset: the input dataset. + + Returns: + A `Tensor` representing the batch of the entire input dataset. + """ + if isinstance(dataset.output_classes, tuple): + raise TypeError("Input dataset expected to have a single component") + if dataset.output_classes is ops.Tensor: + return _batch_dense_window(dataset) + elif dataset.output_classes is sparse_tensor.SparseTensor: + return _batch_sparse_window(dataset) + else: + raise TypeError("Unsupported dataset type: %s" % dataset.output_classes) + + +def _batch_dense_window(dataset): + """Batches a window of dense tensors.""" + + def key_fn(_): + return np.int64(0) + + def shape_init_fn(_): + return array_ops.shape(first_element) + + def shape_reduce_fn(state, value): + check_ops.assert_equal(state, array_ops.shape(value)) + return state + + def finalize_fn(state): + return state + + if dataset.output_shapes.is_fully_defined(): + shape = dataset.output_shapes + else: + first_element = get_single_element.get_single_element(dataset.take(1)) + shape_reducer = grouping.Reducer(shape_init_fn, shape_reduce_fn, + finalize_fn) + shape = get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, shape_reducer))) + + def batch_init_fn(_): + batch_shape = array_ops.concat([[0], shape], 0) + return gen_array_ops.empty(batch_shape, dtype=dataset.output_types) + + def batch_reduce_fn(state, value): + return array_ops.concat([state, [value]], 0) + + batch_reducer = grouping.Reducer(batch_init_fn, batch_reduce_fn, finalize_fn) + return get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, batch_reducer))) + + +def _batch_sparse_window(dataset): + """Batches a window of sparse tensors.""" + + def key_fn(_): + return np.int64(0) + + def shape_init_fn(_): + return first_element.dense_shape + + def shape_reduce_fn(state, value): + check_ops.assert_equal(state, value.dense_shape) + return state + + def finalize_fn(state): + return state + + if dataset.output_shapes.is_fully_defined(): + shape = dataset.output_shapes + else: + first_element = get_single_element.get_single_element(dataset.take(1)) + shape_reducer = grouping.Reducer(shape_init_fn, shape_reduce_fn, + finalize_fn) + shape = get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, shape_reducer))) + + def batch_init_fn(_): + indices_shape = array_ops.concat([[0], [array_ops.size(shape) + 1]], 0) + return sparse_tensor.SparseTensor( + indices=gen_array_ops.empty(indices_shape, dtype=dtypes.int64), + values=constant_op.constant([], shape=[0], dtype=dataset.output_types), + dense_shape=array_ops.concat( + [np.array([0], dtype=np.int64), + math_ops.cast(shape, dtypes.int64)], 0)) + + def batch_reduce_fn(state, value): + return sparse_ops.sparse_concat(0, [state, value]) + + def reshape_fn(value): + return sparse_ops.sparse_reshape( + value, + array_ops.concat([np.array([1], dtype=np.int64), value.dense_shape], 0)) + + batch_reducer = grouping.Reducer(batch_init_fn, batch_reduce_fn, finalize_fn) + return get_single_element.get_single_element( + dataset.map(reshape_fn).apply( + grouping.group_by_reducer(key_fn, batch_reducer))) + + def dense_to_sparse_batch(batch_size, row_shape): """A transformation that batches ragged elements into `tf.SparseTensor`s. @@ -82,6 +195,157 @@ def dense_to_sparse_batch(batch_size, row_shape): return _apply_fn +def padded_batch_window(dataset, padded_shape, padding_value=None): + """Batches a window of tensors with padding. + + Args: + dataset: the input dataset. + padded_shape: (Optional.) `tf.TensorShape` or `tf.int64` vector tensor-like + object representing the shape to which the input elements should be padded + prior to batching. Any unknown dimensions (e.g. `tf.Dimension(None)` in a + `tf.TensorShape` or `-1` in a tensor-like object) will be padded to the + maximum size of that dimension in each batch. + padding_value: (Optional.) A scalar-shaped `tf.Tensor`, representing the + padding value to use. Defaults are `0` for numeric types and the empty + string for string types. If `dataset` contains `tf.SparseTensor`, this + value is ignored. + + Returns: + A `Tensor` representing the batch of the entire input dataset. + + Raises: + ValueError: if invalid arguments are provided. + """ + if not issubclass(dataset.output_classes, + (ops.Tensor, sparse_tensor.SparseTensor)): + raise TypeError("Input dataset expected to have a single tensor component") + if issubclass(dataset.output_classes, (ops.Tensor)): + return _padded_batch_dense_window(dataset, padded_shape, padding_value) + elif issubclass(dataset.output_classes, (sparse_tensor.SparseTensor)): + if padding_value is not None: + raise ValueError("Padding value not allowed for sparse tensors") + return _padded_batch_sparse_window(dataset, padded_shape) + else: + raise TypeError("Unsupported dataset type: %s" % dataset.output_classes) + + +def _padded_batch_dense_window(dataset, padded_shape, padding_value=None): + """Batches a window of dense tensors with padding.""" + + padded_shape = math_ops.cast( + convert.partial_shape_to_tensor(padded_shape), dtypes.int32) + + def key_fn(_): + return np.int64(0) + + def max_init_fn(_): + return padded_shape + + def max_reduce_fn(state, value): + """Computes the maximum shape to pad to.""" + condition = math_ops.reduce_all( + math_ops.logical_or( + math_ops.less_equal(array_ops.shape(value), padded_shape), + math_ops.equal(padded_shape, -1))) + assert_op = control_flow_ops.Assert(condition, [ + "Actual shape greater than padded shape: ", + array_ops.shape(value), padded_shape + ]) + with ops.control_dependencies([assert_op]): + return math_ops.maximum(state, array_ops.shape(value)) + + def finalize_fn(state): + return state + + # Compute the padded shape. + max_reducer = grouping.Reducer(max_init_fn, max_reduce_fn, finalize_fn) + padded_shape = get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, max_reducer))) + + if padding_value is None: + if dataset.output_types == dtypes.string: + padding_value = "" + elif dataset.output_types == dtypes.bool: + padding_value = False + elif dataset.output_types == dtypes.variant: + raise TypeError("Unable to create padding for field of type 'variant'") + else: + padding_value = 0 + + def batch_init_fn(_): + return array_ops.fill( + array_ops.concat([np.array([0], dtype=np.int32), padded_shape], 0), + constant_op.constant(padding_value, dtype=dataset.output_types)) + + def batch_reduce_fn(state, value): + return array_ops.concat([state, [value]], 0) + + def pad_fn(value): + shape = array_ops.shape(value) + left = array_ops.zeros_like(shape) + right = padded_shape - shape + return array_ops.pad( + value, array_ops.stack([left, right], 1), constant_values=padding_value) + + batch_reducer = grouping.Reducer(batch_init_fn, batch_reduce_fn, finalize_fn) + return get_single_element.get_single_element( + dataset.map(pad_fn).apply( + grouping.group_by_reducer(key_fn, batch_reducer))) + + +def _padded_batch_sparse_window(dataset, padded_shape): + """Batches a window of sparse tensors with padding.""" + + def key_fn(_): + return np.int64(0) + + def max_init_fn(_): + return convert.partial_shape_to_tensor(padded_shape) + + def max_reduce_fn(state, value): + """Computes the maximum shape to pad to.""" + condition = math_ops.reduce_all( + math_ops.logical_or( + math_ops.less_equal(value.dense_shape, padded_shape), + math_ops.equal(padded_shape, -1))) + assert_op = control_flow_ops.Assert(condition, [ + "Actual shape greater than padded shape: ", value.dense_shape, + padded_shape + ]) + with ops.control_dependencies([assert_op]): + return math_ops.maximum(state, value.dense_shape) + + def finalize_fn(state): + return state + + # Compute the padded shape. + max_reducer = grouping.Reducer(max_init_fn, max_reduce_fn, finalize_fn) + padded_shape = get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, max_reducer))) + + def batch_init_fn(_): + indices_shape = array_ops.concat([[0], [array_ops.size(padded_shape) + 1]], + 0) + return sparse_tensor.SparseTensor( + indices=gen_array_ops.empty(indices_shape, dtype=dtypes.int64), + values=constant_op.constant([], shape=[0], dtype=dataset.output_types), + dense_shape=array_ops.concat( + [np.array([0], dtype=np.int64), padded_shape], 0)) + + def batch_reduce_fn(state, value): + padded_value = sparse_tensor.SparseTensor( + indices=value.indices, values=value.values, dense_shape=padded_shape) + reshaped_value = sparse_ops.sparse_reshape( + padded_value, + array_ops.concat( + [np.array([1], dtype=np.int64), padded_value.dense_shape], 0)) + return sparse_ops.sparse_concat(0, [state, reshaped_value]) + + reducer = grouping.Reducer(batch_init_fn, batch_reduce_fn, finalize_fn) + return get_single_element.get_single_element( + dataset.apply(grouping.group_by_reducer(key_fn, reducer))) + + class _UnbatchDataset(dataset_ops.Dataset): """A dataset that splits the elements of its input into multiple elements.""" |