aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/eager/python/datasets.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/eager/python/datasets.py')
-rw-r--r--tensorflow/contrib/eager/python/datasets.py64
1 files changed, 8 insertions, 56 deletions
diff --git a/tensorflow/contrib/eager/python/datasets.py b/tensorflow/contrib/eager/python/datasets.py
index 58c548d798..e31dbbe80f 100644
--- a/tensorflow/contrib/eager/python/datasets.py
+++ b/tensorflow/contrib/eager/python/datasets.py
@@ -18,33 +18,14 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import threading
-
from tensorflow.contrib.data.python.ops import prefetching_ops
from tensorflow.python.data.ops import iterator_ops
-from tensorflow.python.data.util import nest
-from tensorflow.python.data.util import sparse
from tensorflow.python.eager import context
-from tensorflow.python.framework import constant_op
-from tensorflow.python.framework import dtypes
-from tensorflow.python.framework import function
from tensorflow.python.framework import ops
from tensorflow.python.ops import gen_dataset_ops
-from tensorflow.python.ops import resource_variable_ops
from tensorflow.python.training.checkpointable import base as checkpointable
from tensorflow.python.training.saver import BaseSaverBuilder
-_uid_counter = 0
-_uid_lock = threading.Lock()
-
-
-def _generate_shared_name(prefix):
- with _uid_lock:
- global _uid_counter
- uid = _uid_counter
- _uid_counter += 1
- return "{}{}".format(prefix, uid)
-
class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
"""An iterator producing tf.Tensor objects from a tf.data.Dataset.
@@ -80,38 +61,18 @@ class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
"`tf.contrib.eager.Iterator`. Use `for ... in dataset:` to iterate "
"over the dataset instead.")
- super(Iterator, self).__init__(dataset)
if not context.context().device_spec.device_type:
is_remote_device = False
else:
is_remote_device = context.context().device_spec.device_type != "CPU"
- self._buffer_resource_handle = None
if is_remote_device:
- with ops.device("/device:CPU:0"):
- iter_string_handle = gen_dataset_ops.iterator_to_string_handle(
- self._resource)
-
- @function.Defun(dtypes.string)
- def remote_fn(h):
- remote_iterator = iterator_ops.Iterator.from_string_handle(
- h, self.output_types, self.output_shapes, self.output_classes)
- return remote_iterator.get_next()
-
- remote_fn.add_to_graph(None)
- target = constant_op.constant("/device:CPU:0")
- with ops.device(self._device):
- self._buffer_resource_handle = prefetching_ops.function_buffering_resource( # pylint: disable=line-too-long
- string_arg=iter_string_handle,
- output_types=self._flat_output_types,
- f=remote_fn,
- target_device=target,
- buffer_size=10,
- container="",
- shared_name=_generate_shared_name(
- "contrib_eager_iterator_function_buffer_resource"))
- self._buffer_resource_deleter = resource_variable_ops.EagerResourceDeleter( # pylint: disable=line-too-long
- handle=self._buffer_resource_handle,
- handle_device=self._device)
+ with ops.device(None):
+ # Let the placer figure out where to place the various functions etc.
+ # created by the CopyToDeviceDataset.
+ dataset = dataset.apply(prefetching_ops.copy_to_device(
+ context.context().device_name))
+ dataset = dataset.prefetch(1)
+ super(Iterator, self).__init__(dataset)
def _next_internal(self):
"""Returns a nested structure of `tf.Tensor`s containing the next element.
@@ -120,16 +81,7 @@ class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
# that there is no more data to iterate over.
# TODO(b/77291417): Fix
with context.execution_mode(context.SYNC):
- if self._buffer_resource_handle is not None:
- with ops.device(self._device):
- ret = prefetching_ops.function_buffering_resource_get_next(
- function_buffer_resource=self._buffer_resource_handle,
- output_types=self._flat_output_types)
- return sparse.deserialize_sparse_tensors(
- nest.pack_sequence_as(self._output_types, ret), self._output_types,
- self._output_shapes, self._output_classes)
- else:
- return super(Iterator, self)._next_internal()
+ return super(Iterator, self)._next_internal()
# TODO(shivaniagrawal): Expose checkpointable stateful objects from dataset
# attributes(potential).