diff options
Diffstat (limited to 'tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py')
-rw-r--r-- | tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py | 641 |
1 files changed, 624 insertions, 17 deletions
diff --git a/tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py b/tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py index 20ed639750..2da6131e8e 100644 --- a/tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py +++ b/tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py @@ -21,6 +21,7 @@ import threading from tensorflow.contrib.data.python.ops import prefetching_ops from tensorflow.core.protobuf import config_pb2 +from tensorflow.python.compat import compat from tensorflow.python.data.ops import dataset_ops from tensorflow.python.data.ops import iterator_ops from tensorflow.python.framework import constant_op @@ -30,6 +31,7 @@ from tensorflow.python.framework import function from tensorflow.python.framework import ops from tensorflow.python.framework import sparse_tensor from tensorflow.python.framework import test_util +from tensorflow.python.ops import array_ops from tensorflow.python.ops import resource_variable_ops from tensorflow.python.platform import test @@ -86,8 +88,7 @@ class PrefetchingKernelsOpsTest(test.TestCase): return (prefetch_op, reset_op, destroy_op) def _prefetch_fn_helper_one_shot(self, buffer_name, device0, device1): - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) ds, ds_iterator = self._create_ds_and_iterator(device0, initializable=False) prefetch_op, _, destroy_op = self._create_ops(ds, ds_iterator, buffer_name, @@ -126,8 +127,7 @@ class PrefetchingKernelsOpsTest(test.TestCase): "/job:localhost/replica:0/task:0/gpu:0") def testReinitialization(self): - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) device0 = "/job:localhost/replica:0/task:0/cpu:0" device1 = "/job:localhost/replica:0/task:0/cpu:1" @@ -167,8 +167,7 @@ class PrefetchingKernelsOpsTest(test.TestCase): sess.run(destroy_op) def testReinitializationOutOfRange(self): - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) device0 = "/job:localhost/replica:0/task:0/cpu:0" device1 = "/job:localhost/replica:0/task:0/cpu:1" @@ -237,9 +236,9 @@ class PrefetchingKernelsOpsTest(test.TestCase): buffer_resource_handle, ignore_lookup_error=True) with self.test_session() as sess: - self.assertEqual(["a"], sess.run(prefetch_op)) - self.assertEqual(["b"], sess.run(prefetch_op)) - self.assertEqual(["c"], sess.run(prefetch_op)) + self.assertEqual([b"a"], sess.run(prefetch_op)) + self.assertEqual([b"b"], sess.run(prefetch_op)) + self.assertEqual([b"c"], sess.run(prefetch_op)) with self.assertRaises(errors.OutOfRangeError): sess.run(prefetch_op) @@ -271,8 +270,7 @@ class PrefetchToDeviceTest(test.TestCase): self.assertEqual(dtypes.int64, next_element.dtype) self.assertEqual([], next_element.shape) - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) with self.test_session(config=worker_config) as sess: for i in range(10): self.assertEqual(i, sess.run(next_element)) @@ -332,8 +330,7 @@ class PrefetchToDeviceTest(test.TestCase): self.assertEqual(dtypes.int64, next_element["a"].dtype) self.assertEqual([], next_element["a"].shape) - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) with self.test_session(config=worker_config) as sess: for i in range(10): self.assertEqual({"a": i}, sess.run(next_element)) @@ -366,8 +363,7 @@ class PrefetchToDeviceTest(test.TestCase): next_element = iterator.get_next() self.assertEqual(dtypes.int64, next_element.dtype) - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) with self.test_session(config=worker_config) as sess: for i in range(10): actual = sess.run(next_element) @@ -417,8 +413,7 @@ class PrefetchToDeviceTest(test.TestCase): self.assertEqual(dtypes.int64, next_element.dtype) self.assertEqual([], next_element.shape) - worker_config = config_pb2.ConfigProto() - worker_config.device_count["CPU"] = 2 + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) with self.test_session(config=worker_config) as sess: sess.run(iterator.initializer) for i in range(5): @@ -451,5 +446,617 @@ class PrefetchToDeviceTest(test.TestCase): sess.run(next_element) +class CopyToDeviceTest(test.TestCase): + + def testCopyToDevice(self): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + self.assertEqual([], next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceInt32(self): + host_dataset = dataset_ops.Dataset.from_tensors([0, 1, 2, 3]) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int32, next_element.dtype) + self.assertEqual((4,), next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + self.assertAllEqual([0, 1, 2, 3], sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToSameDevice(self): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:0")) + + with ops.device("/cpu:0"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + self.assertEqual([], next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceWithPrefetch(self): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")).prefetch(1) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + self.assertEqual([], next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyDictToDevice(self): + host_dataset = dataset_ops.Dataset.range(10).map(lambda x: {"a": x}) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element["a"].dtype) + self.assertEqual([], next_element["a"].shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + self.assertEqual({"a": i}, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyDictToDeviceWithPrefetch(self): + host_dataset = dataset_ops.Dataset.range(10).map(lambda x: {"a": x}) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")).prefetch(1) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element["a"].dtype) + self.assertEqual([], next_element["a"].shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + self.assertEqual({"a": i}, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopySparseTensorsToDevice(self): + + def make_tensor(i): + return sparse_tensor.SparseTensorValue( + indices=[[0, 0]], values=(i * [1]), dense_shape=[2, 2]) + + host_dataset = dataset_ops.Dataset.range(10).map(make_tensor) + + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + actual = sess.run(next_element) + self.assertAllEqual([i], actual.values) + self.assertAllEqual([[0, 0]], actual.indices) + self.assertAllEqual([2, 2], actual.dense_shape) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopySparseTensorsToDeviceWithPrefetch(self): + + def make_tensor(i): + return sparse_tensor.SparseTensorValue( + indices=[[0, 0]], values=(i * [1]), dense_shape=[2, 2]) + + host_dataset = dataset_ops.Dataset.range(10).map(make_tensor) + + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")).prefetch(1) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_one_shot_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + for i in range(10): + actual = sess.run(next_element) + self.assertAllEqual([i], actual.values) + self.assertAllEqual([[0, 0]], actual.indices) + self.assertAllEqual([2, 2], actual.dense_shape) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpu(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuWithPrefetch(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")).prefetch(1) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuInt32(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.from_tensors([0, 1, 2, 3]) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + self.assertAllEqual([0, 1, 2, 3], sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuInt32AndPrefetch(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.from_tensors([0, 1, 2, 3]) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")).prefetch(1) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + self.assertAllEqual([0, 1, 2, 3], sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuStrings(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.from_tensors(["a", "b", "c"]) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + self.assertAllEqual([b"a", b"b", b"c"], sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuStringsAndPrefetch(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.from_tensors(["a", "b", "c"]) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + self.assertAllEqual([b"a", b"b", b"c"], sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDevicePingPongCPUGPU(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + with compat.forward_compatibility_horizon(2018, 8, 4): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0", source_device="/cpu:0")) + back_to_cpu_dataset = device_dataset.apply( + prefetching_ops.copy_to_device("/cpu:0", source_device="/gpu:0")) + + with ops.device("/cpu:0"): + iterator = back_to_cpu_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceWithReInit(self): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + self.assertEqual([], next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + sess.run(iterator.initializer) + for i in range(5): + self.assertEqual(i, sess.run(next_element)) + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceWithReInitAndPrefetch(self): + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/cpu:1")).prefetch(1) + + with ops.device("/cpu:1"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + self.assertEqual(host_dataset.output_types, device_dataset.output_types) + self.assertEqual(host_dataset.output_types, iterator.output_types) + self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes) + self.assertEqual(host_dataset.output_shapes, iterator.output_shapes) + self.assertEqual(host_dataset.output_classes, device_dataset.output_classes) + self.assertEqual(host_dataset.output_classes, iterator.output_classes) + + self.assertEqual(dtypes.int64, next_element.dtype) + self.assertEqual([], next_element.shape) + + worker_config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=worker_config) as sess: + sess.run(iterator.initializer) + for i in range(5): + self.assertEqual(i, sess.run(next_element)) + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuWithReInit(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + for i in range(5): + self.assertEqual(i, sess.run(next_element)) + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + def testCopyToDeviceGpuWithReInitAndPrefetch(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + host_dataset = dataset_ops.Dataset.range(10) + device_dataset = host_dataset.apply( + prefetching_ops.copy_to_device("/gpu:0")).prefetch(1) + + with ops.device("/gpu:0"): + iterator = device_dataset.make_initializable_iterator() + next_element = iterator.get_next() + + with self.test_session() as sess: + sess.run(iterator.initializer) + for i in range(5): + self.assertEqual(i, sess.run(next_element)) + sess.run(iterator.initializer) + for i in range(10): + self.assertEqual(i, sess.run(next_element)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(next_element) + + +class MultiDeviceIteratorTest(test.TestCase): + + def testBasic(self): + dataset = dataset_ops.Dataset.range(10) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/cpu:2"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 3}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 10, 2): + self.assertEqual(i, sess.run(elem_on_1)) + self.assertEqual(i + 1, sess.run(elem_on_2)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + def testOneOnSameDevice(self): + with ops.device("/cpu:0"): + dataset = dataset_ops.Dataset.range(10) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:0", "/cpu:1"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 2}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 10, 2): + self.assertEqual(i, sess.run(elem_on_1)) + self.assertEqual(i + 1, sess.run(elem_on_2)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + def testRepeatDevices(self): + with ops.device("/cpu:0"): + dataset = dataset_ops.Dataset.range(20) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/cpu:2", "/cpu:1", "/cpu:2"]) + elements = multi_device_iterator.get_next() + elem_on_1, elem_on_2, elem_on_3, elem_on_4 = elements + + config = config_pb2.ConfigProto(device_count={"CPU": 3}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 20, 4): + self.assertEqual(i, sess.run(elem_on_1)) + self.assertEqual(i + 1, sess.run(elem_on_2)) + self.assertEqual(i + 2, sess.run(elem_on_3)) + self.assertEqual(i + 3, sess.run(elem_on_4)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + sess.run(elem_on_3) + sess.run(elem_on_4) + + def testNotFullyDivisible(self): + dataset = dataset_ops.Dataset.range(9) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/cpu:2"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 3}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 8, 2): + self.assertEqual(i, sess.run(elem_on_1)) + self.assertEqual(i + 1, sess.run(elem_on_2)) + self.assertEqual(8, sess.run(elem_on_1)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + def testUneven(self): + dataset = dataset_ops.Dataset.range(10) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/cpu:2"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 3}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 10, 2): + self.assertEqual(i, sess.run(elem_on_1)) + for i in range(0, 10, 2): + self.assertEqual(i + 1, sess.run(elem_on_2)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + def testMultipleInitializations(self): + with ops.device("/cpu:0"): + epoch = array_ops.placeholder(dtypes.int64, shape=[]) + dataset1 = dataset_ops.Dataset.from_tensors(epoch).repeat(1000) + dataset2 = dataset_ops.Dataset.range(1000) + dataset = dataset_ops.Dataset.zip((dataset1, dataset2)) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/cpu:2"], prefetch_buffer_size=4) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + init_op = multi_device_iterator.initializer + + config = config_pb2.ConfigProto(device_count={"CPU": 3}) + with self.test_session(config=config) as sess: + for i in range(1000): + sess.run(init_op, feed_dict={epoch: i}) + self.assertEqual([(i, 0), (i, 1)], sess.run([elem_on_1, elem_on_2])) + + def testBasicGpu(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + with compat.forward_compatibility_horizon(2018, 8, 4): + dataset = dataset_ops.Dataset.range(10) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/gpu:0"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 2, "GPU": 1}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 10, 2): + self.assertEqual(i, sess.run(elem_on_1)) + self.assertEqual(i + 1, sess.run(elem_on_2)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + def testUnevenGpu(self): + if not test_util.is_gpu_available(): + self.skipTest("No GPU available") + + with compat.forward_compatibility_horizon(2018, 8, 4): + dataset = dataset_ops.Dataset.range(10) + multi_device_iterator = prefetching_ops.MultiDeviceIterator( + dataset, ["/cpu:1", "/gpu:0"]) + elem_on_1, elem_on_2 = multi_device_iterator.get_next() + + config = config_pb2.ConfigProto(device_count={"CPU": 2, "GPU": 1}) + with self.test_session(config=config) as sess: + sess.run(multi_device_iterator.initializer) + for i in range(0, 10, 2): + self.assertEqual(i, sess.run(elem_on_1)) + for i in range(0, 10, 2): + self.assertEqual(i + 1, sess.run(elem_on_2)) + with self.assertRaises(errors.OutOfRangeError): + sess.run(elem_on_1) + sess.run(elem_on_2) + + if __name__ == "__main__": test.main() |