aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/data/python/ops/grouping.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/data/python/ops/grouping.py')
-rw-r--r--tensorflow/contrib/data/python/ops/grouping.py441
1 files changed, 17 insertions, 424 deletions
diff --git a/tensorflow/contrib/data/python/ops/grouping.py b/tensorflow/contrib/data/python/ops/grouping.py
index 7cae33beb3..a99dc2f29a 100644
--- a/tensorflow/contrib/data/python/ops/grouping.py
+++ b/tensorflow/contrib/data/python/ops/grouping.py
@@ -17,20 +17,13 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import numpy as np
-from tensorflow.python.data.ops import dataset_ops
-from tensorflow.python.data.util import nest
-from tensorflow.python.framework import constant_op
-from tensorflow.python.framework import dtypes
-from tensorflow.python.framework import ops
-from tensorflow.python.framework import tensor_shape
-from tensorflow.python.ops import array_ops
-from tensorflow.python.ops import check_ops
-from tensorflow.python.ops import gen_dataset_ops
-from tensorflow.python.ops import math_ops
+from tensorflow.python.data.experimental.ops import grouping
+from tensorflow.python.util import deprecation
+@deprecation.deprecated(None,
+ "Use `tf.data.experimental.group_by_reducer(...)`.")
def group_by_reducer(key_func, reducer):
"""A transformation that groups elements and performs a reduction.
@@ -52,14 +45,11 @@ def group_by_reducer(key_func, reducer):
A `Dataset` transformation function, which can be passed to
`tf.data.Dataset.apply`.
"""
-
- def _apply_fn(dataset):
- """Function from `Dataset` to `Dataset` that applies the transformation."""
- return _GroupByReducerDataset(dataset, key_func, reducer)
-
- return _apply_fn
+ return grouping.group_by_reducer(key_func, reducer)
+@deprecation.deprecated(None,
+ "Use `tf.data.experimental.group_by_window(...)`.")
def group_by_window(key_func,
reduce_func,
window_size=None,
@@ -98,27 +88,12 @@ def group_by_window(key_func,
ValueError: if neither or both of {`window_size`, `window_size_func`} are
passed.
"""
- if (window_size is not None and window_size_func or
- not (window_size is not None or window_size_func)):
- raise ValueError("Must pass either window_size or window_size_func.")
-
- if window_size is not None:
-
- def constant_window_func(unused_key):
- return ops.convert_to_tensor(window_size, dtype=dtypes.int64)
-
- window_size_func = constant_window_func
-
- assert window_size_func is not None
-
- def _apply_fn(dataset):
- """Function from `Dataset` to `Dataset` that applies the transformation."""
- return _GroupByWindowDataset(dataset, key_func, reduce_func,
- window_size_func)
-
- return _apply_fn
+ return grouping.group_by_window(key_func, reduce_func, window_size,
+ window_size_func)
+@deprecation.deprecated(
+ None, "Use `tf.data.experimental.bucket_by_sequence_length(...)`.")
def bucket_by_sequence_length(element_length_func,
bucket_boundaries,
bucket_batch_sizes,
@@ -163,342 +138,12 @@ def bucket_by_sequence_length(element_length_func,
Raises:
ValueError: if `len(bucket_batch_sizes) != len(bucket_boundaries) + 1`.
"""
- with ops.name_scope("bucket_by_seq_length"):
- if len(bucket_batch_sizes) != (len(bucket_boundaries) + 1):
- raise ValueError(
- "len(bucket_batch_sizes) must equal len(bucket_boundaries) + 1")
-
- batch_sizes = constant_op.constant(bucket_batch_sizes, dtype=dtypes.int64)
-
- def element_to_bucket_id(*args):
- """Return int64 id of the length bucket for this element."""
- seq_length = element_length_func(*args)
-
- boundaries = list(bucket_boundaries)
- buckets_min = [np.iinfo(np.int32).min] + boundaries
- buckets_max = boundaries + [np.iinfo(np.int32).max]
- conditions_c = math_ops.logical_and(
- math_ops.less_equal(buckets_min, seq_length),
- math_ops.less(seq_length, buckets_max))
- bucket_id = math_ops.reduce_min(array_ops.where(conditions_c))
-
- return bucket_id
-
- def window_size_fn(bucket_id):
- # The window size is set to the batch size for this bucket
- window_size = batch_sizes[bucket_id]
- return window_size
-
- def make_padded_shapes(shapes, none_filler=None):
- padded = []
- for shape in nest.flatten(shapes):
- shape = tensor_shape.TensorShape(shape)
- shape = [
- none_filler if d.value is None else d
- for d in shape
- ]
- padded.append(shape)
- return nest.pack_sequence_as(shapes, padded)
-
- def batching_fn(bucket_id, grouped_dataset):
- """Batch elements in dataset."""
- batch_size = window_size_fn(bucket_id)
- if no_padding:
- return grouped_dataset.batch(batch_size)
- none_filler = None
- if pad_to_bucket_boundary:
- err_msg = ("When pad_to_bucket_boundary=True, elements must have "
- "length < max(bucket_boundaries).")
- check = check_ops.assert_less(
- bucket_id,
- constant_op.constant(len(bucket_batch_sizes) - 1,
- dtype=dtypes.int64),
- message=err_msg)
- with ops.control_dependencies([check]):
- boundaries = constant_op.constant(bucket_boundaries,
- dtype=dtypes.int64)
- bucket_boundary = boundaries[bucket_id]
- none_filler = bucket_boundary - 1
- shapes = make_padded_shapes(
- padded_shapes or grouped_dataset.output_shapes,
- none_filler=none_filler)
- return grouped_dataset.padded_batch(batch_size, shapes, padding_values)
-
- def _apply_fn(dataset):
- return dataset.apply(
- group_by_window(element_to_bucket_id, batching_fn,
- window_size_func=window_size_fn))
-
- return _apply_fn
-
-
-def _map_x_dataset(map_func):
- """A transformation that maps `map_func` across its input.
-
- This transformation is similar to `tf.data.Dataset.map`, but in addition to
- supporting dense and sparse tensor inputs, it also supports dataset inputs.
-
- Args:
- map_func: A function mapping a nested structure of tensors and/or datasets
- (having shapes and types defined by `self.output_shapes` and
- `self.output_types`) to another nested structure of tensors and/or
- datasets.
-
- Returns:
- Dataset: A `Dataset`.
- """
-
- def _apply_fn(dataset):
- """Function from `Dataset` to `Dataset` that applies the transformation."""
- return _MapXDataset(dataset, map_func)
-
- return _apply_fn
-
-
-# TODO(b/115382007) Remove this once canned reducers move to core.
-def window_dataset(window_size):
- """A transformation that creates window datasets from the input dataset.
-
- The resulting datasets will contain `window_size` elements (or
- `N % window_size` for the last dataset if `window_size` does not divide the
- number of input elements `N` evenly).
-
- Args:
- window_size: A `tf.int64` scalar `tf.Tensor`, representing the number of
- consecutive elements of the input dataset to combine into a window.
-
- Returns:
- Dataset: A `Dataset`.
- """
-
- def _apply_fn(dataset):
- return dataset_ops.WindowDataset(
- dataset,
- size=window_size,
- shift=window_size,
- stride=1,
- drop_remainder=False)
-
- return _apply_fn
-
-
-class _GroupByReducerDataset(dataset_ops.UnaryDataset):
- """A `Dataset` that groups its input and performs a reduction."""
-
- def __init__(self, input_dataset, key_func, reducer):
- """See `group_by_reducer()` for details."""
- super(_GroupByReducerDataset, self).__init__(input_dataset)
+ return grouping.bucket_by_sequence_length(
+ element_length_func, bucket_boundaries, bucket_batch_sizes, padded_shapes,
+ padding_values, pad_to_bucket_boundary, no_padding)
- self._input_dataset = input_dataset
- self._make_key_func(key_func, input_dataset)
- self._make_init_func(reducer.init_func)
- self._make_reduce_func(reducer.reduce_func, input_dataset)
- self._make_finalize_func(reducer.finalize_func)
-
- def _make_key_func(self, key_func, input_dataset):
- """Make wrapping Defun for key_func."""
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- key_func, "tf.contrib.data.group_by_reducer()", input_dataset)
- if not (
- wrapped_func.output_types == dtypes.int64 and
- wrapped_func.output_shapes.is_compatible_with(tensor_shape.scalar())):
- raise ValueError(
- "`key_func` must return a single tf.int64 tensor. "
- "Got type=%s and shape=%s"
- % (wrapped_func.output_types, wrapped_func.output_shapes))
- self._key_func = wrapped_func.function
-
- def _make_init_func(self, init_func):
- """Make wrapping Defun for init_func."""
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- init_func, "tf.contrib.data.group_by_reducer()",
- input_classes=ops.Tensor, input_shapes=tensor_shape.scalar(),
- input_types=dtypes.int64)
- self._init_func = wrapped_func.function
- self._state_classes = wrapped_func.output_classes
- self._state_shapes = wrapped_func.output_shapes
- self._state_types = wrapped_func.output_types
-
- def _make_reduce_func(self, reduce_func, input_dataset):
- """Make wrapping Defun for reduce_func."""
-
- # Iteratively rerun the reduce function until reaching a fixed point on
- # `self._state_shapes`.
- need_to_rerun = True
- while need_to_rerun:
-
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- reduce_func, "tf.contrib.data.group_by_reducer()",
- input_classes=(self._state_classes, input_dataset.output_classes),
- input_shapes=(self._state_shapes, input_dataset.output_shapes),
- input_types=(self._state_types, input_dataset.output_types),
- add_to_graph=False)
-
- # Extract and validate class information from the returned values.
- for new_state_class, state_class in zip(
- nest.flatten(wrapped_func.output_classes),
- nest.flatten(self._state_classes)):
- if not issubclass(new_state_class, state_class):
- raise TypeError(
- "The element classes for the new state must match the initial "
- "state. Expected %s; got %s." %
- (self._state_classes, wrapped_func.output_classes))
-
- # Extract and validate type information from the returned values.
- for new_state_type, state_type in zip(
- nest.flatten(wrapped_func.output_types),
- nest.flatten(self._state_types)):
- if new_state_type != state_type:
- raise TypeError(
- "The element types for the new state must match the initial "
- "state. Expected %s; got %s." %
- (self._state_types, wrapped_func.output_types))
-
- # Extract shape information from the returned values.
- flat_state_shapes = nest.flatten(self._state_shapes)
- flat_new_state_shapes = nest.flatten(wrapped_func.output_shapes)
- weakened_state_shapes = [
- original.most_specific_compatible_shape(new)
- for original, new in zip(flat_state_shapes, flat_new_state_shapes)
- ]
-
- need_to_rerun = False
- for original_shape, weakened_shape in zip(flat_state_shapes,
- weakened_state_shapes):
- if original_shape.ndims is not None and (
- weakened_shape.ndims is None or
- original_shape.as_list() != weakened_shape.as_list()):
- need_to_rerun = True
- break
-
- if need_to_rerun:
- self._state_shapes = nest.pack_sequence_as(self._state_shapes,
- weakened_state_shapes)
-
- self._reduce_func = wrapped_func.function
- self._reduce_func.add_to_graph(ops.get_default_graph())
-
- def _make_finalize_func(self, finalize_func):
- """Make wrapping Defun for finalize_func."""
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- finalize_func, "tf.contrib.data.group_by_reducer()",
- input_classes=self._state_classes, input_shapes=self._state_shapes,
- input_types=self._state_types)
- self._finalize_func = wrapped_func.function
- self._output_classes = wrapped_func.output_classes
- self._output_shapes = wrapped_func.output_shapes
- self._output_types = wrapped_func.output_types
-
- @property
- def output_classes(self):
- return self._output_classes
-
- @property
- def output_shapes(self):
- return self._output_shapes
-
- @property
- def output_types(self):
- return self._output_types
-
- def _as_variant_tensor(self):
- return gen_dataset_ops.group_by_reducer_dataset(
- self._input_dataset._as_variant_tensor(), # pylint: disable=protected-access
- self._key_func.captured_inputs,
- self._init_func.captured_inputs,
- self._reduce_func.captured_inputs,
- self._finalize_func.captured_inputs,
- key_func=self._key_func,
- init_func=self._init_func,
- reduce_func=self._reduce_func,
- finalize_func=self._finalize_func,
- **dataset_ops.flat_structure(self))
-
-
-class _GroupByWindowDataset(dataset_ops.UnaryDataset):
- """A `Dataset` that groups its input and performs a windowed reduction."""
-
- def __init__(self, input_dataset, key_func, reduce_func, window_size_func):
- """See `group_by_window()` for details."""
- super(_GroupByWindowDataset, self).__init__(input_dataset)
-
- self._input_dataset = input_dataset
-
- self._make_key_func(key_func, input_dataset)
- self._make_reduce_func(reduce_func, input_dataset)
- self._make_window_size_func(window_size_func)
-
- def _make_window_size_func(self, window_size_func):
- """Make wrapping Defun for window_size_func."""
- def window_size_func_wrapper(key):
- return ops.convert_to_tensor(window_size_func(key), dtype=dtypes.int64)
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- window_size_func_wrapper, "tf.contrib.data.group_by_window()",
- input_classes=ops.Tensor, input_shapes=tensor_shape.scalar(),
- input_types=dtypes.int64)
- if not (
- wrapped_func.output_types == dtypes.int64 and
- wrapped_func.output_shapes.is_compatible_with(tensor_shape.scalar())):
- raise ValueError(
- "`window_size_func` must return a single tf.int64 scalar tensor.")
- self._window_size_func = wrapped_func.function
-
- def _make_key_func(self, key_func, input_dataset):
- """Make wrapping Defun for key_func."""
- def key_func_wrapper(*args):
- return ops.convert_to_tensor(key_func(*args), dtype=dtypes.int64)
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- key_func_wrapper, "tf.contrib.data.group_by_window()", input_dataset)
- if not (
- wrapped_func.output_types == dtypes.int64 and
- wrapped_func.output_shapes.is_compatible_with(tensor_shape.scalar())):
- raise ValueError(
- "`key_func` must return a single tf.int64 scalar tensor.")
- self._key_func = wrapped_func.function
-
- def _make_reduce_func(self, reduce_func, input_dataset):
- """Make wrapping Defun for reduce_func."""
- nested_dataset = dataset_ops._NestedDatasetComponent(input_dataset) # pylint: disable=protected-access
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- reduce_func, "tf.contrib.data.reduce_by_window()",
- input_classes=(ops.Tensor, nested_dataset),
- input_shapes=(tensor_shape.scalar(), nested_dataset),
- input_types=(dtypes.int64, nested_dataset),
- experimental_nested_dataset_support=True)
- if not isinstance(
- wrapped_func.output_classes, dataset_ops._NestedDatasetComponent): # pylint: disable=protected-access
- raise TypeError("`reduce_func` must return a `Dataset` object.")
- self._output_classes = wrapped_func.output_classes.output_classes
- self._output_types = wrapped_func.output_types.output_types
- self._output_shapes = wrapped_func.output_shapes.output_shapes
- self._reduce_func = wrapped_func.function
-
- @property
- def output_classes(self):
- return self._output_classes
-
- @property
- def output_shapes(self):
- return self._output_shapes
-
- @property
- def output_types(self):
- return self._output_types
-
- def _as_variant_tensor(self):
- return gen_dataset_ops.group_by_window_dataset(
- self._input_dataset._as_variant_tensor(), # pylint: disable=protected-access
- self._key_func.captured_inputs,
- self._reduce_func.captured_inputs,
- self._window_size_func.captured_inputs,
- key_func=self._key_func,
- reduce_func=self._reduce_func,
- window_size_func=self._window_size_func,
- **dataset_ops.flat_structure(self))
-
-
-class Reducer(object):
+class Reducer(grouping.Reducer):
"""A reducer is used for reducing a set of elements.
A reducer is represented as a tuple of the three functions:
@@ -507,58 +152,6 @@ class Reducer(object):
3) finalization function: state => result
"""
+ @deprecation.deprecated(None, "Use `tf.data.experimental.Reducer(...)`.")
def __init__(self, init_func, reduce_func, finalize_func):
- self._init_func = init_func
- self._reduce_func = reduce_func
- self._finalize_func = finalize_func
-
- @property
- def init_func(self):
- return self._init_func
-
- @property
- def reduce_func(self):
- return self._reduce_func
-
- @property
- def finalize_func(self):
- return self._finalize_func
-
-
-class _MapXDataset(dataset_ops.UnaryDataset):
- """A `Dataset` that maps a function over elements in its input."""
-
- def __init__(self, input_dataset, map_func):
- """See `map_x_dataset()` for details."""
- super(_MapXDataset, self).__init__(input_dataset)
- self._input_dataset = input_dataset
-
- wrapped_func = dataset_ops.StructuredFunctionWrapper(
- map_func,
- "tf.contrib.data.map_x_dataset()",
- input_dataset,
- experimental_nested_dataset_support=True)
- self._output_classes = wrapped_func.output_classes
- self._output_shapes = wrapped_func.output_shapes
- self._output_types = wrapped_func.output_types
- self._map_func = wrapped_func.function
-
- def _as_variant_tensor(self):
- input_t = self._input_dataset._as_variant_tensor() # pylint: disable=protected-access
- return gen_dataset_ops.map_dataset(
- input_t,
- self._map_func.captured_inputs,
- f=self._map_func,
- **dataset_ops.flat_structure(self))
-
- @property
- def output_classes(self):
- return self._output_classes
-
- @property
- def output_shapes(self):
- return self._output_shapes
-
- @property
- def output_types(self):
- return self._output_types
+ super(Reducer, self).__init__(init_func, reduce_func, finalize_func)